]> Creatis software - creaImageIO.git/blob - src2/creaImageIOMultiThreadImageReader.cpp
7daf7403c9429406558d9fe1955ec48a4ee320c6
[creaImageIO.git] / src2 / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6 namespace creaImageIO
7 {
8
9   //=====================================================================
10   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
11   ( const std::string& filename,
12     EventType type,
13     vtkImageData* image)
14   {
15     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
16
17     this->OnMultiThreadImageReaderEvent(filename,type,image);
18   }
19   //=====================================================================
20
21   //=====================================================================
22   class ThreadedImageReader: public wxThread
23   {
24   public:
25     ThreadedImageReader(MultiThreadImageReader* tir) :
26       mMultiThreadImageReader(tir)
27     {}
28
29     void* Entry();
30     void  OnExit();
31
32     vtkImageData* Read(const std::string& filename);
33     
34
35   private:
36     ImageReader mReader;
37     MultiThreadImageReader* mMultiThreadImageReader;
38         
39   };
40   //=====================================================================
41
42   
43   //=====================================================================
44   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
45     : //mDoNotSignal(false),
46       mReader(0),
47       mTotalMem(0),
48       mTotalMemMax(1000000)
49   {
50     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
51     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
52
53     // Create the threads
54     for (int i=0; i<number_of_threads; i++) 
55       {
56         //ThreadedImageReader* t = new ThreadedImageReader(this);
57         boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this));
58         mThreadedImageReaderList.push_back(t);
59          std::cout << "  ===> Thread "<<i
60                               <<" successfully added"<< std::endl;
61       }
62     mNumberOfThreadedReadersRunning = 0;
63     // Init the queue
64     mQueue.set(mComparator);
65     mQueue.set(mIndexer);
66     // 
67     // no thread : alloc self reader
68 //    if (number_of_threads==0)
69 //      {
70         mReader = new ImageReader();
71 //      }
72   }
73   //=====================================================================
74
75
76   //=====================================================================
77   bool MultiThreadImageReader::Start()
78   {
79
80     //    std::cout << "#### MultiThreadImageReader::Start()"
81     //                <<std::endl;
82           if (mNumberOfThreadedReadersRunning > 0) return true;
83           
84     ThreadedImageReaderListType::iterator i;
85     for (i =mThreadedImageReaderList.begin();
86          i!=mThreadedImageReaderList.end();
87          i++)
88       {
89         (*i)->Create();
90         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
91           {
92             std::cout << "ERROR starting a thread"<< std::endl;
93             return false;
94           }
95         else 
96           {
97                     std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
98                               <<" successfully created"<< std::endl;
99             
100           }
101       }
102     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
103     //    std::cout << "EO Start : #Threads running = "
104     //                << mNumberOfThreadedReadersRunning<<std::endl;
105
106     return true;
107   }
108   //=====================================================================
109
110   //=====================================================================
111   void MultiThreadImageReader::Stop()
112   { 
113 //                  std::cout << "#### MultiThreadImageReader::Stop()"
114 //            <<std::endl;
115   //  std::cout << "Sending stop order to the threads..."<<std::endl;
116
117     ThreadedImageReaderListType::iterator i;
118     for (i =mThreadedImageReaderList.begin();
119          i!=mThreadedImageReaderList.end();
120          i++)
121       { std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
122                               <<" successfully stopped"<< std::endl;
123                   if((*i)->IsAlive())
124                   {
125                         (*i)->Delete();
126                   }
127       }
128     mThreadedImageReaderList.clear();
129     // Wait a little to be sure that all threads have stopped
130     // A better way to do this ?
131     //    wxMilliSleep(1000);
132     // New method : the threads generate a stop event when they have finished
133     // We wait until all threads have stopped
134 //        std::cout << "Waiting for stop signals..."<<std::endl;
135     do 
136       {
137         // Sleep a little
138                 wxMilliSleep(10);
139         // Lock
140         {
141           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
142 //                std::cout << "#Threads running = "
143 //                          << mNumberOfThreadedReadersRunning<<std::endl;
144           // Break if all readers have stopped
145           if (mNumberOfThreadedReadersRunning <= 0) 
146             {
147               break;
148             }
149         }
150       } 
151     while (true);
152 //        std::cout << "All threads stopped : OK "<<std::endl;
153
154     ImageMapType::iterator j;
155     for (j =mImages.begin();
156          j!=mImages.end();
157          ++j)
158
159       {
160         delete j->first;
161       }
162     mImages.clear();
163   }
164   //=====================================================================
165
166   //=====================================================================
167   MultiThreadImageReader::~MultiThreadImageReader()
168   {
169     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
170     //        <<std::endl;
171     Stop();
172     if (mReader) delete mReader;
173         mThreadedImageReaderList.clear();
174   }
175   //=====================================================================
176
177   //=====================================================================
178   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
179                                                     int priority)
180   {
181     // not in unload queue : ciao
182     if (p->UnloadIndex()<0) return;
183     int old_prio = p->GetPriority();
184     if (priority > old_prio) 
185       {
186         p->SetPriority(priority);
187         mUnloadQueue.downsort(p->UnloadIndex());
188       }
189     else if ( old_prio > priority )
190       {
191         p->SetPriority(priority);
192         mUnloadQueue.upsort(p->UnloadIndex());
193      }
194   }
195   //=====================================================================
196
197   //=====================================================================
198   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
199                                         const std::string& filename, 
200                                         int priority )
201   {
202         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
203     
204           if (mNumberOfThreadedReadersRunning==0)
205 //    if (mThreadedImageReaderList.size()==0) 
206       {
207         // no detached reader : use self reader
208         ImageToLoad itl(user,filename);
209         ImageMapType::iterator i = mImages.find(&itl);
210         if (i!=mImages.end())
211           {
212             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
213             // Already inserted
214             if (pitl->GetImage() != 0)
215               {
216                 // Already read
217                 pitl->SetUser(user);
218                 UpdateUnloadPriority(pitl,priority);
219                 SignalImageRead(pitl,false);
220                 return; // pitl->GetImage();
221               }
222           }
223         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
224         mImages[pitl] = 0;
225         pitl->SetImage(mReader->ReadImage(filename));
226         UpdateUnloadPriority(pitl,priority);
227         SignalImageRead(pitl,true);
228         //      return pitl->GetImage();
229         return;
230       }
231
232
233     ImageToLoad itl(user,filename);
234     ImageMapType::iterator i = mImages.find(&itl);
235     if (i!=mImages.end())
236       {
237         // Already inserted
238         if (i->first->GetImage() != 0)
239           {
240             // Already read : ok :signal the user
241             UpdateUnloadPriority(i->first,priority);
242             SignalImageRead(i->first,false);
243             return;
244           }
245         /// Already requested : change the priority
246         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
247         pitl->SetPriority(priority);
248         // Already in queue
249         if (pitl->Index()>=0) 
250           {
251             // Re-sort the queue
252             mQueue.upsort(pitl->Index());
253           }
254         // Not read but not in queue = being read = ok
255         else 
256           {
257             
258           }
259       }
260     else 
261       {
262         // Never requested before or unloaded 
263         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
264         mImages[pitl] = 0;
265         mQueue.insert(pitl);
266       }
267   }
268   //=====================================================================
269
270   //=====================================================================
271   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
272   (const std::string& filename,
273    MultiThreadImageReaderUser::EventType e,
274    vtkImageData* image)
275   {
276     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
277         (filename == mRequestedFilename))
278       {
279         mRequestedImage = image;
280       }
281     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
282       {
283         mNumberOfThreadedReadersRunning++;
284         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
285       }
286     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
287       {
288         
289                  mNumberOfThreadedReadersRunning--;
290         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
291       }
292   }
293   //=====================================================================
294
295   //=====================================================================
296   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
297   {
298          // Start();
299     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
300     //           <<std::endl;
301     
302     do 
303       {
304         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
305                 
306         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
307         //             <<"') lock ok"
308         //               <<std::endl;
309     
310         //                if (mNumberOfThreadedReadersRunning==0)
311         //      if (mThreadedImageReaderList.size()==0)
312         if (true)
313           {
314             ImageToLoad itl(this,filename);
315             ImageMapType::iterator i = mImages.find(&itl);
316             if (i!=mImages.end())
317               {
318                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
319                 // Already inserted
320                 if (pitl->GetImage() != 0)
321                   {
322                     // Already read
323                     UpdateUnloadPriority(pitl,
324                                          GetMaximalPriorityWithoutLocking()+1);
325                     return pitl->GetImage();
326                   }
327               }
328             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
329             mImages[pitl] = 0;
330             pitl->SetImage(mReader->ReadImage(filename));
331             UpdateUnloadPriority(pitl,
332                                  GetMaximalPriorityWithoutLocking()+1);
333             return pitl->GetImage();
334           }
335
336         /*      
337         mRequestedFilename = filename;
338         mRequestedImage = 0;
339         ImageToLoad itl(this,filename);
340         ImageMapType::iterator i = mImages.find(&itl);
341         if (i!=mImages.end())
342           {
343             // Already inserted in queue
344             if (i->first->GetImage() != 0)
345               {
346                 // Already read : ok : return it 
347                 return i->first->GetImage();
348               }
349             /// Already requested : change the priority
350               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
351               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
352               pitl->SetUser( this );
353               // Already in queue
354               if (pitl->Index()>=0) 
355                 {
356                   // Re-sort the queue
357                   mQueue.upsort(pitl->Index());
358                 }
359               // Not read but not in queue = being read = ok
360               else 
361                 {
362                   pitl->SetUser( this );
363                 }
364           }
365         else 
366           {
367             
368             // Never requested before or unloaded 
369             ImageToLoadPtr pitl = 
370               new ImageToLoad(this,filename,
371                               GetMaximalPriorityWithoutLocking() + 1);
372             mImages[pitl] = 0;
373             mQueue.insert(pitl);
374           }
375         */
376       }
377     while (0);
378
379     //    std::cout << "Waiting..."<<std::endl;
380
381     /*
382     // Waiting that it is read
383     int n = 0;
384     do 
385       {
386         //      std::cout << n++ << std::endl;
387         wxMilliSleep(10);
388         do 
389           {
390             //      wxMutexLocker lock(mMutex);
391             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
392             if (mRequestedImage!=0) 
393               {
394                 return mRequestedImage;
395               } 
396           }
397         while (0);
398       }
399     while (true);
400     // 
401     */
402   }
403   //=====================================================================
404   
405   //=====================================================================
406   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
407                                                bool purge)
408   {
409     
410 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
411     //    std::cout << "this="<<this <<std::endl;
412     //    std::cout << "user="<<p->GetUser() <<std::endl;
413
414     if ( p->GetUser() == this ) 
415       GetMultiThreadImageReaderUserMutex().Unlock();
416
417     p->GetUser()->MultiThreadImageReaderSendEvent
418       (p->GetFilename(),
419        MultiThreadImageReaderUser::ImageLoaded,
420        p->GetImage());
421
422     /*
423       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
424       BUGGY : TO FIX 
425     */
426     if (!purge)  return;
427     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
428
429     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
430            
431     mUnloadQueue.insert(p);
432     p->GetImage()->UpdateInformation();
433     p->GetImage()->PropagateUpdateExtent();
434     long ImMem = p->GetImage()->GetEstimatedMemorySize();
435     mTotalMem += ImMem;
436
437     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
438     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
439
440     //  return;
441
442     while (mTotalMem > mTotalMemMax)
443       {
444         GimmickMessage(5,
445                        "   ! Exceeded max of "
446                        << mTotalMemMax << " Ko : unloading oldest image ... "
447                        << std::endl);
448         if ( mUnloadQueue.size() <= 1 ) 
449           {
450              GimmickMessage(5,
451                             "   Only one image : cannot load AND unload it !!"
452                             <<std::endl);
453             break; 
454             
455           }
456         ImageToLoadPtr unload = mUnloadQueue.remove_top();
457         MultiThreadImageReaderUser* user = unload->GetUser();
458
459         /*
460         if ((user!=0)&&(user!=this)) 
461           {
462             user->GetMultiThreadImageReaderUserMutex().Lock();
463           }
464         */
465
466         std::string filename = unload->GetFilename();
467
468         GimmickMessage(5,"'" << filename << "'" << std::endl);
469         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
470
471         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
472
473
474         if (user!=0) 
475           {
476             //      std::cout << "unlock..."<<std::endl;
477             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
478             //      std::cout << "event"<<std::endl;
479             user->MultiThreadImageReaderSendEvent
480               (filename,
481                MultiThreadImageReaderUser::ImageUnloaded,
482                0);
483             //      std::cout << "event ok"<<std::endl;
484
485           }     
486
487         if (unload->Index()>=0)
488           {
489             // GimmickMessage(5,"still in queue"<<std::endl);
490           }
491         unload->Index() = -1;
492
493
494         ImageMapType::iterator it = mImages.find(unload);
495         if (it!=mImages.end())
496           {
497             mImages.erase(it);
498           }
499         //          std::cout << "delete..."<<std::endl;
500         delete unload;
501         //          std::cout << "delete ok."<<std::endl;
502
503       }
504     
505   
506   }
507   //=====================================================================
508
509   //=====================================================================
510   int MultiThreadImageReader::GetMaximalPriority()
511   { 
512     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
513     return GetMaximalPriorityWithoutLocking();
514   }
515   //=====================================================================
516
517
518   //=====================================================================
519   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
520   { 
521     long max = 0;
522     if (mQueue.size()>0) 
523       {
524         max = mQueue.top()->GetPriority();
525       }
526     if (mUnloadQueue.size()>0)
527       {
528         int max2 = mUnloadQueue.top()->GetPriority();
529         if (max2>max) max=max2;
530       }
531     return max;
532   }
533   //=====================================================================
534
535
536   //=====================================================================
537   //=====================================================================
538   //=====================================================================
539   //=====================================================================
540
541   //=====================================================================
542   void*  ThreadedImageReader::Entry()
543   {
544     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
545     //                << std::endl;
546
547     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
548       ("",
549        MultiThreadImageReaderUser::ThreadedReaderStarted,
550        0);
551
552     // While was not deleted 
553     while (!TestDestroy())
554       {
555                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
556           
557         // Lock the mutex
558         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
559         //mMutex.Lock();
560         // If image in queue
561         if (mMultiThreadImageReader->mQueue.size()>0)
562           {
563             MultiThreadImageReader::ImageToLoadPtr i = 
564               mMultiThreadImageReader->mQueue.remove_top();
565
566             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
567             //mMutex.Unlock();
568
569             
570             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
571             //                << i->GetFilename() << "'" << std::endl;
572             
573             // Do the job
574             vtkImageData* im = Read(i->GetFilename());
575
576             // Store it in the map
577             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
578             //mMutex.Lock();
579             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
580             MultiThreadImageReader::ImageMapType::iterator it = 
581               mMultiThreadImageReader->mImages.find(&itl);
582             MultiThreadImageReader::ImageToLoadPtr 
583               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
584               (it->first);
585             pitl->SetImage(im);
586             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
587             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
588             
589             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
590             //                << i->GetFilename() << "' : DONE" << std::endl;
591             
592           }
593         else 
594           {
595             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
596             //mMutex.Unlock();
597             // Wait a little to avoid blocking 
598             Sleep(10);
599           }
600       };
601     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
602     //                << std::endl;
603        
604     return 0;
605   }
606   //=====================================================================
607
608   //=====================================================================
609   void ThreadedImageReader::OnExit()
610   {
611     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
612       ("",
613        MultiThreadImageReaderUser::ThreadedReaderStopped,
614        0);
615   }
616   //=====================================================================
617
618   //=====================================================================
619   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
620   {
621     return mReader.ReadImage(filename);
622   }
623   //=====================================================================
624
625 } // namespace creaImageIO