]> Creatis software - creaImageIO.git/blob - src2/creaImageIOMultiThreadImageReader.cpp
*** empty log message ***
[creaImageIO.git] / src2 / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6 namespace creaImageIO
7 {
8
9   //=====================================================================
10   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
11   ( const std::string& filename,
12     EventType type,
13     vtkImageData* image)
14   {
15     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
16     this->OnMultiThreadImageReaderEvent(filename,type,image);
17   }
18   //=====================================================================
19
20   //=====================================================================
21   class ThreadedImageReader: public wxThread
22   {
23   public:
24     ThreadedImageReader(MultiThreadImageReader* tir) :
25       mMultiThreadImageReader(tir)
26     {}
27
28     void* Entry();
29     void  OnExit();
30
31     vtkImageData* Read(const std::string& filename);
32     
33
34   private:
35     ImageReader mReader;
36     MultiThreadImageReader* mMultiThreadImageReader;
37         
38   };
39   //=====================================================================
40
41   
42   //=====================================================================
43   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
44     : //mDoNotSignal(false),
45       mReader(0),
46       mTotalMem(0),
47       mTotalMemMax(1000000)
48   {
49     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
50     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
51
52     // Create the threads
53     for (int i=0; i<number_of_threads; i++) 
54       {
55         ThreadedImageReader* t = new ThreadedImageReader(this);
56         mThreadedImageReaderList.push_back(t);
57       }
58     mNumberOfThreadedReadersRunning = 0;
59     // Init the queue
60     mQueue.set(mComparator);
61     mQueue.set(mIndexer);
62     // 
63     // no thread : alloc self reader
64 //    if (number_of_threads==0)
65 //      {
66         mReader = new ImageReader();
67 //      }
68   }
69   //=====================================================================
70
71
72   //=====================================================================
73   bool MultiThreadImageReader::Start()
74   {
75
76     //    std::cout << "#### MultiThreadImageReader::Start()"
77     //                <<std::endl;
78           if (mNumberOfThreadedReadersRunning > 0) return true;
79           
80     ThreadedImageReaderListType::iterator i;
81     for (i =mThreadedImageReaderList.begin();
82          i!=mThreadedImageReaderList.end();
83          i++)
84       {
85         (*i)->Create();
86         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
87           {
88             std::cout << "ERROR starting a thread"<< std::endl;
89             return false;
90           }
91         else 
92           {
93             //      std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
94             //                <<" successfully created"<< std::endl;
95             
96           }
97       }
98     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
99     //    std::cout << "EO Start : #Threads running = "
100     //                << mNumberOfThreadedReadersRunning<<std::endl;
101
102     return true;
103   }
104   //=====================================================================
105
106   //=====================================================================
107   void MultiThreadImageReader::Stop()
108   { 
109 //                  std::cout << "#### MultiThreadImageReader::Stop()"
110 //            <<std::endl;
111   //  std::cout << "Sending stop order to the threads..."<<std::endl;
112
113     ThreadedImageReaderListType::iterator i;
114     for (i =mThreadedImageReaderList.begin();
115          i!=mThreadedImageReaderList.end();
116          i++)
117       {
118                   if((*i)->IsAlive())
119                   {
120                         (*i)->Delete();
121                   }
122       }
123     mThreadedImageReaderList.clear();
124     // Wait a little to be sure that all threads have stopped
125     // A better way to do this ?
126     //    wxMilliSleep(1000);
127     // New method : the threads generate a stop event when they have finished
128     // We wait until all threads have stopped
129 //        std::cout << "Waiting for stop signals..."<<std::endl;
130     do 
131       {
132         // Sleep a little
133         wxMilliSleep(10);
134         // Lock
135         {
136           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
137 //                std::cout << "#Threads running = "
138 //                          << mNumberOfThreadedReadersRunning<<std::endl;
139           // Break if all readers have stopped
140           if (mNumberOfThreadedReadersRunning <= 0) 
141             {
142               break;
143             }
144         }
145       } 
146     while (true);
147 //        std::cout << "All threads stopped : OK "<<std::endl;
148
149     ImageMapType::iterator j;
150     for (j =mImages.begin();
151          j!=mImages.end();
152          ++j)
153
154       {
155         delete j->first;
156       }
157     mImages.clear();
158   }
159   //=====================================================================
160
161   //=====================================================================
162   MultiThreadImageReader::~MultiThreadImageReader()
163   {
164     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
165     //        <<std::endl;
166     Stop();
167     if (mReader) delete mReader;
168   }
169   //=====================================================================
170
171   //=====================================================================
172   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
173                                                     int priority)
174   {
175     // not in unload queue : ciao
176     if (p->UnloadIndex()<0) return;
177     int old_prio = p->GetPriority();
178     if (priority > old_prio) 
179       {
180         p->SetPriority(priority);
181         mUnloadQueue.downsort(p->UnloadIndex());
182       }
183     else if ( old_prio > priority )
184       {
185         p->SetPriority(priority);
186         mUnloadQueue.upsort(p->UnloadIndex());
187      }
188   }
189   //=====================================================================
190
191   //=====================================================================
192   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
193                                         const std::string& filename, 
194                                         int priority )
195   {
196         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
197     
198           if (mNumberOfThreadedReadersRunning==0)
199 //    if (mThreadedImageReaderList.size()==0) 
200       {
201         // no detached reader : use self reader
202         ImageToLoad itl(user,filename);
203         ImageMapType::iterator i = mImages.find(&itl);
204         if (i!=mImages.end())
205           {
206             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
207             // Already inserted
208             if (pitl->GetImage() != 0)
209               {
210                 // Already read
211                 pitl->SetUser(user);
212                 UpdateUnloadPriority(pitl,priority);
213                 SignalImageRead(pitl,false);
214                 return; // pitl->GetImage();
215               }
216           }
217         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
218         mImages[pitl] = 0;
219         pitl->SetImage(mReader->ReadImage(filename));
220         UpdateUnloadPriority(pitl,priority);
221         SignalImageRead(pitl,true);
222         //      return pitl->GetImage();
223         return;
224       }
225
226
227     ImageToLoad itl(user,filename);
228     ImageMapType::iterator i = mImages.find(&itl);
229     if (i!=mImages.end())
230       {
231         // Already inserted
232         if (i->first->GetImage() != 0)
233           {
234             // Already read : ok :signal the user
235             UpdateUnloadPriority(i->first,priority);
236             SignalImageRead(i->first,false);
237             return;
238           }
239         /// Already requested : change the priority
240         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
241         pitl->SetPriority(priority);
242         // Already in queue
243         if (pitl->Index()>=0) 
244           {
245             // Re-sort the queue
246             mQueue.upsort(pitl->Index());
247           }
248         // Not read but not in queue = being read = ok
249         else 
250           {
251             
252           }
253       }
254     else 
255       {
256         // Never requested before or unloaded 
257         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
258         mImages[pitl] = 0;
259         mQueue.insert(pitl);
260       }
261   }
262   //=====================================================================
263
264   //=====================================================================
265   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
266   (const std::string& filename,
267    MultiThreadImageReaderUser::EventType e,
268    vtkImageData* image)
269   {
270     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
271         (filename == mRequestedFilename))
272       {
273         mRequestedImage = image;
274       }
275     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
276       {
277         mNumberOfThreadedReadersRunning++;
278         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
279       }
280     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
281       {
282         mNumberOfThreadedReadersRunning--;
283         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
284       }
285   }
286   //=====================================================================
287
288   //=====================================================================
289   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
290   {
291          // Start();
292     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
293     //           <<std::endl;
294     
295     do 
296       {
297         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
298                 
299         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
300         //             <<"') lock ok"
301         //               <<std::endl;
302     
303         //                if (mNumberOfThreadedReadersRunning==0)
304         //      if (mThreadedImageReaderList.size()==0)
305         if (true)
306           {
307             ImageToLoad itl(this,filename);
308             ImageMapType::iterator i = mImages.find(&itl);
309             if (i!=mImages.end())
310               {
311                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
312                 // Already inserted
313                 if (pitl->GetImage() != 0)
314                   {
315                     // Already read
316                     UpdateUnloadPriority(pitl,
317                                          GetMaximalPriorityWithoutLocking()+1);
318                     return pitl->GetImage();
319                   }
320               }
321             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
322             mImages[pitl] = 0;
323             pitl->SetImage(mReader->ReadImage(filename));
324             UpdateUnloadPriority(pitl,
325                                  GetMaximalPriorityWithoutLocking()+1);
326             return pitl->GetImage();
327           }
328
329         /*      
330         mRequestedFilename = filename;
331         mRequestedImage = 0;
332         ImageToLoad itl(this,filename);
333         ImageMapType::iterator i = mImages.find(&itl);
334         if (i!=mImages.end())
335           {
336             // Already inserted in queue
337             if (i->first->GetImage() != 0)
338               {
339                 // Already read : ok : return it 
340                 return i->first->GetImage();
341               }
342             /// Already requested : change the priority
343               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
344               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
345               pitl->SetUser( this );
346               // Already in queue
347               if (pitl->Index()>=0) 
348                 {
349                   // Re-sort the queue
350                   mQueue.upsort(pitl->Index());
351                 }
352               // Not read but not in queue = being read = ok
353               else 
354                 {
355                   pitl->SetUser( this );
356                 }
357           }
358         else 
359           {
360             
361             // Never requested before or unloaded 
362             ImageToLoadPtr pitl = 
363               new ImageToLoad(this,filename,
364                               GetMaximalPriorityWithoutLocking() + 1);
365             mImages[pitl] = 0;
366             mQueue.insert(pitl);
367           }
368         */
369       }
370     while (0);
371
372     //    std::cout << "Waiting..."<<std::endl;
373
374     /*
375     // Waiting that it is read
376     int n = 0;
377     do 
378       {
379         //      std::cout << n++ << std::endl;
380         wxMilliSleep(10);
381         do 
382           {
383             //      wxMutexLocker lock(mMutex);
384             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
385             if (mRequestedImage!=0) 
386               {
387                 return mRequestedImage;
388               } 
389           }
390         while (0);
391       }
392     while (true);
393     // 
394     */
395   }
396   //=====================================================================
397   
398   //=====================================================================
399   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
400                                                bool purge)
401   {
402     
403 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
404     //    std::cout << "this="<<this <<std::endl;
405     //    std::cout << "user="<<p->GetUser() <<std::endl;
406
407     if ( p->GetUser() == this ) 
408       GetMultiThreadImageReaderUserMutex().Unlock();
409
410     p->GetUser()->MultiThreadImageReaderSendEvent
411       (p->GetFilename(),
412        MultiThreadImageReaderUser::ImageLoaded,
413        p->GetImage());
414
415     /*
416       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
417       BUGGY : TO FIX 
418     */
419     if (!purge)  return;
420     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
421
422     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
423            
424     mUnloadQueue.insert(p);
425     p->GetImage()->UpdateInformation();
426     p->GetImage()->PropagateUpdateExtent();
427     long ImMem = p->GetImage()->GetEstimatedMemorySize();
428     mTotalMem += ImMem;
429
430     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
431     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
432
433     //  return;
434
435     while (mTotalMem > mTotalMemMax)
436       {
437         GimmickMessage(5,
438                        "   ! Exceeded max of "
439                        << mTotalMemMax << " Ko : unloading oldest image ... "
440                        << std::endl);
441         if ( mUnloadQueue.size() <= 1 ) 
442           {
443              GimmickMessage(5,
444                             "   Only one image : cannot load AND unload it !!"
445                             <<std::endl);
446             break; 
447             
448           }
449         ImageToLoadPtr unload = mUnloadQueue.remove_top();
450         MultiThreadImageReaderUser* user = unload->GetUser();
451
452         /*
453         if ((user!=0)&&(user!=this)) 
454           {
455             user->GetMultiThreadImageReaderUserMutex().Lock();
456           }
457         */
458
459         std::string filename = unload->GetFilename();
460
461         GimmickMessage(5,"'" << filename << "'" << std::endl);
462         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
463
464         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
465
466
467         if (user!=0) 
468           {
469             //      std::cout << "unlock..."<<std::endl;
470             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
471             //      std::cout << "event"<<std::endl;
472             user->MultiThreadImageReaderSendEvent
473               (filename,
474                MultiThreadImageReaderUser::ImageUnloaded,
475                0);
476             //      std::cout << "event ok"<<std::endl;
477
478           }     
479
480         if (unload->Index()>=0)
481           {
482             // GimmickMessage(5,"still in queue"<<std::endl);
483           }
484         unload->Index() = -1;
485
486
487         ImageMapType::iterator it = mImages.find(unload);
488         if (it!=mImages.end())
489           {
490             mImages.erase(it);
491           }
492         //          std::cout << "delete..."<<std::endl;
493         delete unload;
494         //          std::cout << "delete ok."<<std::endl;
495
496       }
497     
498   
499   }
500   //=====================================================================
501
502   //=====================================================================
503   int MultiThreadImageReader::GetMaximalPriority()
504   { 
505     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
506     return GetMaximalPriorityWithoutLocking();
507   }
508   //=====================================================================
509
510
511   //=====================================================================
512   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
513   { 
514     long max = 0;
515     if (mQueue.size()>0) 
516       {
517         max = mQueue.top()->GetPriority();
518       }
519     if (mUnloadQueue.size()>0)
520       {
521         int max2 = mUnloadQueue.top()->GetPriority();
522         if (max2>max) max=max2;
523       }
524     return max;
525   }
526   //=====================================================================
527
528
529   //=====================================================================
530   //=====================================================================
531   //=====================================================================
532   //=====================================================================
533
534   //=====================================================================
535   void*  ThreadedImageReader::Entry()
536   {
537     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
538     //                << std::endl;
539
540     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
541       ("",
542        MultiThreadImageReaderUser::ThreadedReaderStarted,
543        0);
544
545     // While was not deleted 
546     while (!TestDestroy())
547       {
548         //      std::cout << "### Thread "<<GetCurrentId()<<" waiting for image"
549         //        << std::endl;
550           
551         // Lock the mutex
552         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
553         //mMutex.Lock();
554         // If image in queue
555         if (mMultiThreadImageReader->mQueue.size()>0)
556           {
557             MultiThreadImageReader::ImageToLoadPtr i = 
558               mMultiThreadImageReader->mQueue.remove_top();
559
560             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
561             //mMutex.Unlock();
562
563             
564             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
565             //                << i->GetFilename() << "'" << std::endl;
566             
567             // Do the job
568             vtkImageData* im = Read(i->GetFilename());
569
570             // Store it in the map
571             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
572             //mMutex.Lock();
573             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
574             MultiThreadImageReader::ImageMapType::iterator it = 
575               mMultiThreadImageReader->mImages.find(&itl);
576             MultiThreadImageReader::ImageToLoadPtr 
577               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
578               (it->first);
579             pitl->SetImage(im);
580             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
581             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
582             
583             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
584             //                << i->GetFilename() << "' : DONE" << std::endl;
585             
586           }
587         else 
588           {
589             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
590             //mMutex.Unlock();
591             // Wait a little to avoid blocking 
592             Sleep(10);
593           }
594       };
595     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
596     //                << std::endl;
597        
598     return 0;
599   }
600   //=====================================================================
601
602   //=====================================================================
603   void ThreadedImageReader::OnExit()
604   {
605     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
606       ("",
607        MultiThreadImageReaderUser::ThreadedReaderStopped,
608        0);
609   }
610   //=====================================================================
611
612   //=====================================================================
613   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
614   {
615     return mReader.ReadImage(filename);
616   }
617   //=====================================================================
618
619 } // namespace creaImageIO