]> Creatis software - creaImageIO.git/blob - src2/creaImageIOMultiThreadImageReader.cpp
thread bug correction
[creaImageIO.git] / src2 / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6 #include <creaImageIOGimmick.h>
7 #ifdef _DEBUG
8 #define new DEBUG_NEW
9 #endif
10 namespace creaImageIO
11 {
12
13   //=====================================================================
14   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
15   ( const std::string& filename,
16     EventType type,
17     vtkImageData* image)
18   {
19     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
20
21     this->OnMultiThreadImageReaderEvent(filename,type,image);
22   }
23   //=====================================================================
24
25   //=====================================================================
26   class ThreadedImageReader: public wxThread
27   {
28   public:
29     ThreadedImageReader(MultiThreadImageReader* tir) :
30       mMultiThreadImageReader(tir)
31     {}
32
33     void* Entry();
34     void  OnExit();
35
36     vtkImageData* Read(const std::string& filename);
37     
38         struct deleter
39         {
40                 void operator()(ThreadedImageReader* p)
41                 {
42                         p->Delete();
43                 }
44         };
45         friend struct deleter;
46
47
48   private:
49     ImageReader mReader;
50     MultiThreadImageReader* mMultiThreadImageReader;
51         
52   };
53
54   //=====================================================================
55
56   
57   //=====================================================================
58   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
59     : //mDoNotSignal(false),
60       mReader(0),
61       mTotalMem(0),
62       mTotalMemMax(1000000)
63   {
64     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
65     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
66
67           mDone = false;
68     // Create the threads
69     for (int i=0; i<number_of_threads; i++) 
70       {
71                   //ThreadedImageReader* t = new ThreadedImageReader(this);
72                   boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this));//, ThreadedImageReader::deleter());
73         mThreadedImageReaderList.push_back(t);
74          std::cout << "  ===> Thread "<<i
75                       <<" successfully added"<< std::endl;
76       }
77     mNumberOfThreadedReadersRunning = 0;
78     // Init the queue
79     mQueue.set(mComparator);
80     mQueue.set(mIndexer);
81     // 
82     // no thread : alloc self reader
83 //    if (number_of_threads==0)
84 //      {
85         mReader = new ImageReader();
86 //      }
87   }
88   //=====================================================================
89
90
91   //=====================================================================
92   bool MultiThreadImageReader::Start()
93   {
94
95     //    std::cout << "#### MultiThreadImageReader::Start()"
96     //                <<std::endl;
97           if (mNumberOfThreadedReadersRunning > 0) return true;
98           
99     ThreadedImageReaderListType::iterator i;
100     for (i =mThreadedImageReaderList.begin();
101          i!=mThreadedImageReaderList.end();
102          i++)
103       {
104         (*i)->Create();
105         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
106           {
107             std::cout << "ERROR starting a thread"<< std::endl;
108             return false;
109           }
110         else 
111           {
112                     std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
113                               <<" successfully created"<< std::endl;
114             
115           }
116       }
117     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
118     //    std::cout << "EO Start : #Threads running = "
119     //                << mNumberOfThreadedReadersRunning<<std::endl;
120
121     return true;
122   }
123   //=====================================================================
124
125   //=====================================================================
126   void MultiThreadImageReader::Stop()
127   { 
128 //                  std::cout << "#### MultiThreadImageReader::Stop()"
129 //            <<std::endl;
130   //  std::cout << "Sending stop order to the threads..."<<std::endl;
131           if (mDone) return;
132
133     ThreadedImageReaderListType::iterator i;
134     for (i =mThreadedImageReaderList.begin();
135          i!=mThreadedImageReaderList.end();
136          i++)
137       { std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
138                               <<" successfully stopped"<< std::endl;
139                   if((*i)->IsAlive())
140                   {
141                           (*i).reset();
142                          //(*i)->Delete();
143                   }
144       }
145    mThreadedImageReaderList.clear();
146     // Wait a little to be sure that all threads have stopped
147     // A better way to do this ?
148     //    wxMilliSleep(1000);
149     // New method : the threads generate a stop event when they have finished
150     // We wait until all threads have stopped
151 //        std::cout << "Waiting for stop signals..."<<std::endl;
152     do 
153       {
154         // Sleep a little
155                 wxMilliSleep(10);
156         // Lock
157         {
158           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
159 //                std::cout << "#Threads running = "
160 //                          << mNumberOfThreadedReadersRunning<<std::endl;
161           // Break if all readers have stopped
162           if (mNumberOfThreadedReadersRunning <= 0) 
163             {
164               break;
165             }
166         }
167       } 
168     while (true);
169 //        std::cout << "All threads stopped : OK "<<std::endl;
170
171     ImageMapType::iterator j;
172     for (j =mImages.begin();
173          j!=mImages.end();
174          ++j)
175
176       {
177         delete j->first;
178       }
179     mImages.clear();
180         mDone = true;
181   }
182   //=====================================================================
183
184   //=====================================================================
185   MultiThreadImageReader::~MultiThreadImageReader()
186   {
187     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
188     //        <<std::endl;
189     Stop();
190     if (mReader) delete mReader;
191         mThreadedImageReaderList.clear();
192   }
193   //=====================================================================
194
195   //=====================================================================
196   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
197                                                     int priority)
198   {
199     // not in unload queue : ciao
200     if (p->UnloadIndex()<0) return;
201     int old_prio = p->GetPriority();
202     if (priority > old_prio) 
203       {
204         p->SetPriority(priority);
205         mUnloadQueue.downsort(p->UnloadIndex());
206       }
207     else if ( old_prio > priority )
208       {
209         p->SetPriority(priority);
210         mUnloadQueue.upsort(p->UnloadIndex());
211      }
212   }
213   //=====================================================================
214
215   //=====================================================================
216   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
217                                         const std::string& filename, 
218                                         int priority )
219   {
220         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
221
222           if (mNumberOfThreadedReadersRunning==0)
223 //    if (mThreadedImageReaderList.size()==0) 
224       {
225         // no detached reader : use self reader
226         ImageToLoad itl(user,filename);
227         ImageMapType::iterator i = mImages.find(&itl);
228         if (i!=mImages.end())
229           {
230             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
231             // Already inserted
232             if (pitl->GetImage() != 0)
233               {
234                 // Already read
235                 pitl->SetUser(user);
236                 UpdateUnloadPriority(pitl,priority);
237                 SignalImageRead(pitl,false);
238                 return; // pitl->GetImage();
239               }
240           }
241         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
242         mImages[pitl] = 0;
243         pitl->SetImage(mReader->ReadImage(filename));
244         UpdateUnloadPriority(pitl,priority);
245         SignalImageRead(pitl,true);
246         //      return pitl->GetImage();
247         return;
248       }
249
250     ImageToLoad itl(user,filename);
251     ImageMapType::iterator i = mImages.find(&itl);
252     if (i!=mImages.end())
253       {
254         // Already inserted
255         if (i->first->GetImage() != 0)
256           {
257             // Already read : ok :signal the user
258             UpdateUnloadPriority(i->first,priority);
259             SignalImageRead(i->first,false);
260             return;
261           }
262         /// Already requested : change the priority
263         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
264         pitl->SetPriority(priority);
265         // Already in queue
266         if (pitl->Index()>=0) 
267           {
268             // Re-sort the queue
269             mQueue.upsort(pitl->Index());
270           }
271         // Not read but not in queue = being read = ok
272         else 
273           {
274             
275           }
276       }
277     else 
278       {
279         // Never requested before or unloaded 
280         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
281         mImages[pitl] = 0;
282         mQueue.insert(pitl);
283       }
284   }
285   //=====================================================================
286
287   //=====================================================================
288   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
289   (const std::string& filename,
290    MultiThreadImageReaderUser::EventType e,
291    vtkImageData* image)
292   {
293     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
294         (filename == mRequestedFilename))
295       {
296         mRequestedImage = image;
297       }
298     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
299       {
300         mNumberOfThreadedReadersRunning++;
301         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
302       }
303     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
304       {
305         
306                  mNumberOfThreadedReadersRunning--;
307         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
308       }
309   }
310   //=====================================================================
311
312   //=====================================================================
313   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
314   {
315          // Start();
316     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
317     //           <<std::endl;
318     
319     do 
320       {
321         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
322                 
323         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
324         //             <<"') lock ok"
325         //               <<std::endl;
326     
327         //                if (mNumberOfThreadedReadersRunning==0)
328         //      if (mThreadedImageReaderList.size()==0)
329         if (true)
330           {
331             ImageToLoad itl(this,filename);
332             ImageMapType::iterator i = mImages.find(&itl);
333             if (i!=mImages.end())
334               {
335                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
336                 // Already inserted
337                 if (pitl->GetImage() != 0)
338                   {
339                     // Already read
340                     UpdateUnloadPriority(pitl,
341                                          GetMaximalPriorityWithoutLocking()+1);
342                     return pitl->GetImage();
343                   }
344               }
345             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
346             mImages[pitl] = 0;
347             pitl->SetImage(mReader->ReadImage(filename));
348             UpdateUnloadPriority(pitl,
349                                  GetMaximalPriorityWithoutLocking()+1);
350             return pitl->GetImage();
351           }
352
353         /*      
354         mRequestedFilename = filename;
355         mRequestedImage = 0;
356         ImageToLoad itl(this,filename);
357         ImageMapType::iterator i = mImages.find(&itl);
358         if (i!=mImages.end())
359           {
360             // Already inserted in queue
361             if (i->first->GetImage() != 0)
362               {
363                 // Already read : ok : return it 
364                 return i->first->GetImage();
365               }
366             /// Already requested : change the priority
367               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
368               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
369               pitl->SetUser( this );
370               // Already in queue
371               if (pitl->Index()>=0) 
372                 {
373                   // Re-sort the queue
374                   mQueue.upsort(pitl->Index());
375                 }
376               // Not read but not in queue = being read = ok
377               else 
378                 {
379                   pitl->SetUser( this );
380                 }
381           }
382         else 
383           {
384             
385             // Never requested before or unloaded 
386             ImageToLoadPtr pitl = 
387               new ImageToLoad(this,filename,
388                               GetMaximalPriorityWithoutLocking() + 1);
389             mImages[pitl] = 0;
390             mQueue.insert(pitl);
391           }
392         */
393       }
394     while (0);
395
396     //    std::cout << "Waiting..."<<std::endl;
397
398     /*
399     // Waiting that it is read
400     int n = 0;
401     do 
402       {
403         //      std::cout << n++ << std::endl;
404         wxMilliSleep(10);
405         do 
406           {
407             //      wxMutexLocker lock(mMutex);
408             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
409             if (mRequestedImage!=0) 
410               {
411                 return mRequestedImage;
412               } 
413           }
414         while (0);
415       }
416     while (true);
417     // 
418     */
419   }
420   //=====================================================================
421   
422   //=====================================================================
423   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
424                                                bool purge)
425   {
426     
427 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
428     //    std::cout << "this="<<this <<std::endl;
429     //    std::cout << "user="<<p->GetUser() <<std::endl;
430
431     if ( p->GetUser() == this ) 
432       GetMultiThreadImageReaderUserMutex().Unlock();
433
434     p->GetUser()->MultiThreadImageReaderSendEvent
435       (p->GetFilename(),
436        MultiThreadImageReaderUser::ImageLoaded,
437        p->GetImage());
438
439     /*
440       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
441       BUGGY : TO FIX 
442     */
443     if (!purge)  return;
444     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
445
446     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
447            
448     mUnloadQueue.insert(p);
449     p->GetImage()->UpdateInformation();
450     p->GetImage()->PropagateUpdateExtent();
451     long ImMem = p->GetImage()->GetEstimatedMemorySize();
452     mTotalMem += ImMem;
453
454     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
455     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
456
457     //  return;
458
459     while (mTotalMem > mTotalMemMax)
460       {
461         GimmickMessage(5,
462                        "   ! Exceeded max of "
463                        << mTotalMemMax << " Ko : unloading oldest image ... "
464                        << std::endl);
465         if ( mUnloadQueue.size() <= 1 ) 
466           {
467              GimmickMessage(5,
468                             "   Only one image : cannot load AND unload it !!"
469                             <<std::endl);
470             break; 
471             
472           }
473         ImageToLoadPtr unload = mUnloadQueue.remove_top();
474         MultiThreadImageReaderUser* user = unload->GetUser();
475
476         /*
477         if ((user!=0)&&(user!=this)) 
478           {
479             user->GetMultiThreadImageReaderUserMutex().Lock();
480           }
481         */
482
483         std::string filename = unload->GetFilename();
484
485         GimmickMessage(5,"'" << filename << "'" << std::endl);
486         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
487
488         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
489
490         if (user!=0) 
491           {
492             //      std::cout << "unlock..."<<std::endl;
493             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
494             //      std::cout << "event"<<std::endl;
495             user->MultiThreadImageReaderSendEvent
496               (filename,
497                MultiThreadImageReaderUser::ImageUnloaded,
498                0);
499             //      std::cout << "event ok"<<std::endl;
500           }     
501
502         if (unload->Index()>=0)
503           {
504             // GimmickMessage(5,"still in queue"<<std::endl);
505           }
506         unload->Index() = -1;
507
508
509         ImageMapType::iterator it = mImages.find(unload);
510         if (it!=mImages.end())
511           {
512             mImages.erase(it);
513           }
514         //          std::cout << "delete..."<<std::endl;
515         delete unload;
516         //          std::cout << "delete ok."<<std::endl;
517
518       }
519   }
520   //=====================================================================
521
522   //=====================================================================
523   int MultiThreadImageReader::GetMaximalPriority()
524   { 
525     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
526     return GetMaximalPriorityWithoutLocking();
527   }
528   //=====================================================================
529
530
531   //=====================================================================
532   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
533   { 
534     long max = 0;
535     if (mQueue.size()>0) 
536       {
537         max = mQueue.top()->GetPriority();
538       }
539     if (mUnloadQueue.size()>0)
540       {
541         int max2 = mUnloadQueue.top()->GetPriority();
542         if (max2>max) max=max2;
543       }
544     return max;
545   }
546   //=====================================================================
547
548
549   //=====================================================================
550   //=====================================================================
551   //=====================================================================
552   //=====================================================================
553
554   //=====================================================================
555   void*  ThreadedImageReader::Entry()
556   {
557     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
558     //                << std::endl;
559
560     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
561       ("",
562        MultiThreadImageReaderUser::ThreadedReaderStarted,
563        0);
564
565     // While was not deleted 
566     while (!TestDestroy())
567       {
568                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
569           
570         // Lock the mutex
571         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
572         //mMutex.Lock();
573         // If image in queue
574         if (mMultiThreadImageReader->mQueue.size()>0)
575           {
576             MultiThreadImageReader::ImageToLoadPtr i = 
577               mMultiThreadImageReader->mQueue.remove_top();
578
579             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
580             //mMutex.Unlock();
581
582             
583             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
584             //                << i->GetFilename() << "'" << std::endl;
585             
586             // Do the job
587             vtkImageData* im = Read(i->GetFilename());
588
589             // Store it in the map
590             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
591             //mMutex.Lock();
592             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
593             MultiThreadImageReader::ImageMapType::iterator it = 
594               mMultiThreadImageReader->mImages.find(&itl);
595             MultiThreadImageReader::ImageToLoadPtr 
596               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
597               (it->first);
598             pitl->SetImage(im);
599             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
600             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
601             
602             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
603             //                << i->GetFilename() << "' : DONE" << std::endl;
604             
605           }
606         else 
607           {
608             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
609             //mMutex.Unlock();
610             // Wait a little to avoid blocking 
611             Sleep(10);
612           }
613       };
614     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
615     //                << std::endl;
616        
617     return 0;
618   }
619   //=====================================================================
620
621   //=====================================================================
622   void ThreadedImageReader::OnExit()
623   {
624     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
625       ("",
626        MultiThreadImageReaderUser::ThreadedReaderStopped,
627        0);
628   }
629   //=====================================================================
630
631   //=====================================================================
632   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
633   {
634     return mReader.ReadImage(filename);
635   }
636   //=====================================================================
637
638 } // namespace creaImageIO