]> Creatis software - creaImageIO.git/blob - src2/creaImageIOMultiThreadImageReader.cpp
65d2aab22f32643f59b6c921abbe5816b37c2837
[creaImageIO.git] / src2 / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6
7 namespace creaImageIO
8 {
9
10   //=====================================================================
11   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
12   ( const std::string& filename,
13     EventType type,
14     vtkImageData* image)
15   {
16     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
17
18     this->OnMultiThreadImageReaderEvent(filename,type,image);
19   }
20   //=====================================================================
21
22   //=====================================================================
23   class ThreadedImageReader: public wxThread
24   {
25   public:
26     ThreadedImageReader(MultiThreadImageReader* tir) :
27       mMultiThreadImageReader(tir)
28     {}
29
30     void* Entry();
31     void  OnExit();
32
33     vtkImageData* Read(const std::string& filename);
34     
35         struct deleter
36         {
37                 void operator()(ThreadedImageReader* p)
38                 {
39                         p->Delete();
40                 }
41         };
42         friend struct deleter;
43
44
45   private:
46     ImageReader mReader;
47     MultiThreadImageReader* mMultiThreadImageReader;
48         
49   };
50
51  
52   //=====================================================================
53
54   
55   //=====================================================================
56   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
57     : //mDoNotSignal(false),
58       mReader(0),
59       mTotalMem(0),
60       mTotalMemMax(1000000)
61   {
62     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
63     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
64
65     // Create the threads
66     for (int i=0; i<number_of_threads; i++) 
67       {
68                   //ThreadedImageReader* t = new ThreadedImageReader(this);
69                   boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this), ThreadedImageReader::deleter());
70         mThreadedImageReaderList.push_back(t);
71          std::cout << "  ===> Thread "<<i
72                               <<" successfully added"<< std::endl;
73       }
74     mNumberOfThreadedReadersRunning = 0;
75     // Init the queue
76     mQueue.set(mComparator);
77     mQueue.set(mIndexer);
78     // 
79     // no thread : alloc self reader
80 //    if (number_of_threads==0)
81 //      {
82         mReader = new ImageReader();
83 //      }
84   }
85   //=====================================================================
86
87
88   //=====================================================================
89   bool MultiThreadImageReader::Start()
90   {
91
92     //    std::cout << "#### MultiThreadImageReader::Start()"
93     //                <<std::endl;
94           if (mNumberOfThreadedReadersRunning > 0) return true;
95           
96     ThreadedImageReaderListType::iterator i;
97     for (i =mThreadedImageReaderList.begin();
98          i!=mThreadedImageReaderList.end();
99          i++)
100       {
101         (*i)->Create();
102         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
103           {
104             std::cout << "ERROR starting a thread"<< std::endl;
105             return false;
106           }
107         else 
108           {
109                     std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
110                               <<" successfully created"<< std::endl;
111             
112           }
113       }
114     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
115     //    std::cout << "EO Start : #Threads running = "
116     //                << mNumberOfThreadedReadersRunning<<std::endl;
117
118     return true;
119   }
120   //=====================================================================
121
122   //=====================================================================
123   void MultiThreadImageReader::Stop()
124   { 
125 //                  std::cout << "#### MultiThreadImageReader::Stop()"
126 //            <<std::endl;
127   //  std::cout << "Sending stop order to the threads..."<<std::endl;
128
129     ThreadedImageReaderListType::iterator i;
130     for (i =mThreadedImageReaderList.begin();
131          i!=mThreadedImageReaderList.end();
132          i++)
133       { std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
134                               <<" successfully stopped"<< std::endl;
135                   if((*i)->IsAlive())
136                   {
137                           (*i).reset();
138                          //(*i)->Delete();
139                   }
140       }
141  //  mThreadedImageReaderList.clear();
142     // Wait a little to be sure that all threads have stopped
143     // A better way to do this ?
144     //    wxMilliSleep(1000);
145     // New method : the threads generate a stop event when they have finished
146     // We wait until all threads have stopped
147 //        std::cout << "Waiting for stop signals..."<<std::endl;
148     do 
149       {
150         // Sleep a little
151                 wxMilliSleep(10);
152         // Lock
153         {
154           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
155 //                std::cout << "#Threads running = "
156 //                          << mNumberOfThreadedReadersRunning<<std::endl;
157           // Break if all readers have stopped
158           if (mNumberOfThreadedReadersRunning <= 0) 
159             {
160               break;
161             }
162         }
163       } 
164     while (true);
165 //        std::cout << "All threads stopped : OK "<<std::endl;
166
167     ImageMapType::iterator j;
168     for (j =mImages.begin();
169          j!=mImages.end();
170          ++j)
171
172       {
173         delete j->first;
174       }
175     mImages.clear();
176   }
177   //=====================================================================
178
179   //=====================================================================
180   MultiThreadImageReader::~MultiThreadImageReader()
181   {
182     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
183     //        <<std::endl;
184     Stop();
185     if (mReader) delete mReader;
186         mThreadedImageReaderList.clear();
187   }
188   //=====================================================================
189
190   //=====================================================================
191   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
192                                                     int priority)
193   {
194     // not in unload queue : ciao
195     if (p->UnloadIndex()<0) return;
196     int old_prio = p->GetPriority();
197     if (priority > old_prio) 
198       {
199         p->SetPriority(priority);
200         mUnloadQueue.downsort(p->UnloadIndex());
201       }
202     else if ( old_prio > priority )
203       {
204         p->SetPriority(priority);
205         mUnloadQueue.upsort(p->UnloadIndex());
206      }
207   }
208   //=====================================================================
209
210   //=====================================================================
211   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
212                                         const std::string& filename, 
213                                         int priority )
214   {
215         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
216     
217           if (mNumberOfThreadedReadersRunning==0)
218 //    if (mThreadedImageReaderList.size()==0) 
219       {
220         // no detached reader : use self reader
221         ImageToLoad itl(user,filename);
222         ImageMapType::iterator i = mImages.find(&itl);
223         if (i!=mImages.end())
224           {
225             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
226             // Already inserted
227             if (pitl->GetImage() != 0)
228               {
229                 // Already read
230                 pitl->SetUser(user);
231                 UpdateUnloadPriority(pitl,priority);
232                 SignalImageRead(pitl,false);
233                 return; // pitl->GetImage();
234               }
235           }
236         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
237         mImages[pitl] = 0;
238         pitl->SetImage(mReader->ReadImage(filename));
239         UpdateUnloadPriority(pitl,priority);
240         SignalImageRead(pitl,true);
241         //      return pitl->GetImage();
242         return;
243       }
244
245
246     ImageToLoad itl(user,filename);
247     ImageMapType::iterator i = mImages.find(&itl);
248     if (i!=mImages.end())
249       {
250         // Already inserted
251         if (i->first->GetImage() != 0)
252           {
253             // Already read : ok :signal the user
254             UpdateUnloadPriority(i->first,priority);
255             SignalImageRead(i->first,false);
256             return;
257           }
258         /// Already requested : change the priority
259         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
260         pitl->SetPriority(priority);
261         // Already in queue
262         if (pitl->Index()>=0) 
263           {
264             // Re-sort the queue
265             mQueue.upsort(pitl->Index());
266           }
267         // Not read but not in queue = being read = ok
268         else 
269           {
270             
271           }
272       }
273     else 
274       {
275         // Never requested before or unloaded 
276         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
277         mImages[pitl] = 0;
278         mQueue.insert(pitl);
279       }
280   }
281   //=====================================================================
282
283   //=====================================================================
284   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
285   (const std::string& filename,
286    MultiThreadImageReaderUser::EventType e,
287    vtkImageData* image)
288   {
289     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
290         (filename == mRequestedFilename))
291       {
292         mRequestedImage = image;
293       }
294     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
295       {
296         mNumberOfThreadedReadersRunning++;
297         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
298       }
299     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
300       {
301         
302                  mNumberOfThreadedReadersRunning--;
303         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
304       }
305   }
306   //=====================================================================
307
308   //=====================================================================
309   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
310   {
311          // Start();
312     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
313     //           <<std::endl;
314     
315     do 
316       {
317         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
318                 
319         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
320         //             <<"') lock ok"
321         //               <<std::endl;
322     
323         //                if (mNumberOfThreadedReadersRunning==0)
324         //      if (mThreadedImageReaderList.size()==0)
325         if (true)
326           {
327             ImageToLoad itl(this,filename);
328             ImageMapType::iterator i = mImages.find(&itl);
329             if (i!=mImages.end())
330               {
331                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
332                 // Already inserted
333                 if (pitl->GetImage() != 0)
334                   {
335                     // Already read
336                     UpdateUnloadPriority(pitl,
337                                          GetMaximalPriorityWithoutLocking()+1);
338                     return pitl->GetImage();
339                   }
340               }
341             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
342             mImages[pitl] = 0;
343             pitl->SetImage(mReader->ReadImage(filename));
344             UpdateUnloadPriority(pitl,
345                                  GetMaximalPriorityWithoutLocking()+1);
346             return pitl->GetImage();
347           }
348
349         /*      
350         mRequestedFilename = filename;
351         mRequestedImage = 0;
352         ImageToLoad itl(this,filename);
353         ImageMapType::iterator i = mImages.find(&itl);
354         if (i!=mImages.end())
355           {
356             // Already inserted in queue
357             if (i->first->GetImage() != 0)
358               {
359                 // Already read : ok : return it 
360                 return i->first->GetImage();
361               }
362             /// Already requested : change the priority
363               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
364               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
365               pitl->SetUser( this );
366               // Already in queue
367               if (pitl->Index()>=0) 
368                 {
369                   // Re-sort the queue
370                   mQueue.upsort(pitl->Index());
371                 }
372               // Not read but not in queue = being read = ok
373               else 
374                 {
375                   pitl->SetUser( this );
376                 }
377           }
378         else 
379           {
380             
381             // Never requested before or unloaded 
382             ImageToLoadPtr pitl = 
383               new ImageToLoad(this,filename,
384                               GetMaximalPriorityWithoutLocking() + 1);
385             mImages[pitl] = 0;
386             mQueue.insert(pitl);
387           }
388         */
389       }
390     while (0);
391
392     //    std::cout << "Waiting..."<<std::endl;
393
394     /*
395     // Waiting that it is read
396     int n = 0;
397     do 
398       {
399         //      std::cout << n++ << std::endl;
400         wxMilliSleep(10);
401         do 
402           {
403             //      wxMutexLocker lock(mMutex);
404             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
405             if (mRequestedImage!=0) 
406               {
407                 return mRequestedImage;
408               } 
409           }
410         while (0);
411       }
412     while (true);
413     // 
414     */
415   }
416   //=====================================================================
417   
418   //=====================================================================
419   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
420                                                bool purge)
421   {
422     
423 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
424     //    std::cout << "this="<<this <<std::endl;
425     //    std::cout << "user="<<p->GetUser() <<std::endl;
426
427     if ( p->GetUser() == this ) 
428       GetMultiThreadImageReaderUserMutex().Unlock();
429
430     p->GetUser()->MultiThreadImageReaderSendEvent
431       (p->GetFilename(),
432        MultiThreadImageReaderUser::ImageLoaded,
433        p->GetImage());
434
435     /*
436       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
437       BUGGY : TO FIX 
438     */
439     if (!purge)  return;
440     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
441
442     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
443            
444     mUnloadQueue.insert(p);
445     p->GetImage()->UpdateInformation();
446     p->GetImage()->PropagateUpdateExtent();
447     long ImMem = p->GetImage()->GetEstimatedMemorySize();
448     mTotalMem += ImMem;
449
450     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
451     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
452
453     //  return;
454
455     while (mTotalMem > mTotalMemMax)
456       {
457         GimmickMessage(5,
458                        "   ! Exceeded max of "
459                        << mTotalMemMax << " Ko : unloading oldest image ... "
460                        << std::endl);
461         if ( mUnloadQueue.size() <= 1 ) 
462           {
463              GimmickMessage(5,
464                             "   Only one image : cannot load AND unload it !!"
465                             <<std::endl);
466             break; 
467             
468           }
469         ImageToLoadPtr unload = mUnloadQueue.remove_top();
470         MultiThreadImageReaderUser* user = unload->GetUser();
471
472         /*
473         if ((user!=0)&&(user!=this)) 
474           {
475             user->GetMultiThreadImageReaderUserMutex().Lock();
476           }
477         */
478
479         std::string filename = unload->GetFilename();
480
481         GimmickMessage(5,"'" << filename << "'" << std::endl);
482         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
483
484         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
485
486
487         if (user!=0) 
488           {
489             //      std::cout << "unlock..."<<std::endl;
490             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
491             //      std::cout << "event"<<std::endl;
492             user->MultiThreadImageReaderSendEvent
493               (filename,
494                MultiThreadImageReaderUser::ImageUnloaded,
495                0);
496             //      std::cout << "event ok"<<std::endl;
497
498           }     
499
500         if (unload->Index()>=0)
501           {
502             // GimmickMessage(5,"still in queue"<<std::endl);
503           }
504         unload->Index() = -1;
505
506
507         ImageMapType::iterator it = mImages.find(unload);
508         if (it!=mImages.end())
509           {
510             mImages.erase(it);
511           }
512         //          std::cout << "delete..."<<std::endl;
513         delete unload;
514         //          std::cout << "delete ok."<<std::endl;
515
516       }
517     
518   
519   }
520   //=====================================================================
521
522   //=====================================================================
523   int MultiThreadImageReader::GetMaximalPriority()
524   { 
525     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
526     return GetMaximalPriorityWithoutLocking();
527   }
528   //=====================================================================
529
530
531   //=====================================================================
532   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
533   { 
534     long max = 0;
535     if (mQueue.size()>0) 
536       {
537         max = mQueue.top()->GetPriority();
538       }
539     if (mUnloadQueue.size()>0)
540       {
541         int max2 = mUnloadQueue.top()->GetPriority();
542         if (max2>max) max=max2;
543       }
544     return max;
545   }
546   //=====================================================================
547
548
549   //=====================================================================
550   //=====================================================================
551   //=====================================================================
552   //=====================================================================
553
554   //=====================================================================
555   void*  ThreadedImageReader::Entry()
556   {
557     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
558     //                << std::endl;
559
560     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
561       ("",
562        MultiThreadImageReaderUser::ThreadedReaderStarted,
563        0);
564
565     // While was not deleted 
566     while (!TestDestroy())
567       {
568                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
569           
570         // Lock the mutex
571         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
572         //mMutex.Lock();
573         // If image in queue
574         if (mMultiThreadImageReader->mQueue.size()>0)
575           {
576             MultiThreadImageReader::ImageToLoadPtr i = 
577               mMultiThreadImageReader->mQueue.remove_top();
578
579             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
580             //mMutex.Unlock();
581
582             
583             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
584             //                << i->GetFilename() << "'" << std::endl;
585             
586             // Do the job
587             vtkImageData* im = Read(i->GetFilename());
588
589             // Store it in the map
590             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
591             //mMutex.Lock();
592             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
593             MultiThreadImageReader::ImageMapType::iterator it = 
594               mMultiThreadImageReader->mImages.find(&itl);
595             MultiThreadImageReader::ImageToLoadPtr 
596               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
597               (it->first);
598             pitl->SetImage(im);
599             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
600             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
601             
602             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
603             //                << i->GetFilename() << "' : DONE" << std::endl;
604             
605           }
606         else 
607           {
608             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
609             //mMutex.Unlock();
610             // Wait a little to avoid blocking 
611             Sleep(10);
612           }
613       };
614     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
615     //                << std::endl;
616        
617     return 0;
618   }
619   //=====================================================================
620
621   //=====================================================================
622   void ThreadedImageReader::OnExit()
623   {
624     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
625       ("",
626        MultiThreadImageReaderUser::ThreadedReaderStopped,
627        0);
628   }
629   //=====================================================================
630
631   //=====================================================================
632   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
633   {
634     return mReader.ReadImage(filename);
635   }
636   //=====================================================================
637
638 } // namespace creaImageIO