]> Creatis software - creaImageIO.git/blob - src2/creaImageIOMultiThreadImageReader.cpp
*** empty log message ***
[creaImageIO.git] / src2 / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6 namespace creaImageIO
7 {
8
9   //=====================================================================
10   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
11   ( const std::string& filename,
12     EventType type,
13     vtkImageData* image)
14   {
15     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
16     this->OnMultiThreadImageReaderEvent(filename,type,image);
17   }
18   //=====================================================================
19
20   //=====================================================================
21   class ThreadedImageReader: public wxThread
22   {
23   public:
24     ThreadedImageReader(MultiThreadImageReader* tir) :
25       mMultiThreadImageReader(tir)
26     {}
27
28     void* Entry();
29     void  OnExit();
30
31     vtkImageData* Read(const std::string& filename);
32     
33
34   private:
35     ImageReader mReader;
36     MultiThreadImageReader* mMultiThreadImageReader;
37   };
38   //=====================================================================
39
40   
41   //=====================================================================
42   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
43     : //mDoNotSignal(false),
44       mReader(0),
45       mTotalMem(0),
46       mTotalMemMax(100000)
47   {
48     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
49     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
50
51     // Create the threads
52     for (int i=0; i<number_of_threads; i++) 
53       {
54         ThreadedImageReader* t = new ThreadedImageReader(this);
55         mThreadedImageReaderList.push_back(t);
56       }
57     mNumberOfThreadedReadersRunning = 0;
58     // Init the queue
59     mQueue.set(mComparator);
60     mQueue.set(mIndexer);
61     // 
62     // no thread : alloc self reader
63     if (number_of_threads==0)
64       {
65         mReader = new ImageReader();
66       }
67   }
68   //=====================================================================
69
70
71   //=====================================================================
72   bool MultiThreadImageReader::Start()
73   {
74
75     //    std::cout << "#### MultiThreadImageReader::Start()"
76     //                <<std::endl;
77     ThreadedImageReaderListType::iterator i;
78     for (i =mThreadedImageReaderList.begin();
79          i!=mThreadedImageReaderList.end();
80          i++)
81       {
82         (*i)->Create();
83         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
84           {
85             std::cout << "ERROR starting a thread"<< std::endl;
86             return false;
87           }
88         else 
89           {
90             //      std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
91             //                <<" successfully created"<< std::endl;
92             
93           }
94       }
95     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
96     //    std::cout << "EO Start : #Threads running = "
97     //                << mNumberOfThreadedReadersRunning<<std::endl;
98
99     return true;
100   }
101   //=====================================================================
102
103   //=====================================================================
104   void MultiThreadImageReader::Stop()
105   { 
106 //                  std::cout << "#### MultiThreadImageReader::Stop()"
107 //            <<std::endl;
108   //  std::cout << "Sending stop order to the threads..."<<std::endl;
109
110     ThreadedImageReaderListType::iterator i;
111     for (i =mThreadedImageReaderList.begin();
112          i!=mThreadedImageReaderList.end();
113          i++)
114       {
115         (*i)->Delete();
116       }
117     mThreadedImageReaderList.clear();
118     // Wait a little to be sure that all threads have stopped
119     // A better way to do this ?
120     //    wxMilliSleep(1000);
121     // New method : the threads generate a stop event when they have finished
122     // We wait until all threads have stopped
123 //        std::cout << "Waiting for stop signals..."<<std::endl;
124     do 
125       {
126         // Sleep a little
127         wxMilliSleep(10);
128         // Lock
129         {
130           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
131 //                std::cout << "#Threads running = "
132 //                          << mNumberOfThreadedReadersRunning<<std::endl;
133           // Break if all readers have stopped
134           if (mNumberOfThreadedReadersRunning <= 0) 
135             {
136               break;
137             }
138         }
139       } 
140     while (true);
141 //        std::cout << "All threads stopped : OK "<<std::endl;
142
143     ImageMapType::iterator j;
144     for (j =mImages.begin();
145          j!=mImages.end();
146          ++j)
147
148       {
149         delete j->first;
150       }
151     mImages.clear();
152   }
153   //=====================================================================
154
155   //=====================================================================
156   MultiThreadImageReader::~MultiThreadImageReader()
157   {
158     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
159     //        <<std::endl;
160     Stop();
161     if (mReader) delete mReader;
162   }
163   //=====================================================================
164
165   //=====================================================================
166   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
167                                                     int priority)
168   {
169     // not in unload queue : ciao
170     if (p->UnloadIndex()<0) return;
171     int old_prio = p->GetPriority();
172     if (priority > old_prio) 
173       {
174         p->SetPriority(priority);
175         mUnloadQueue.downsort(p->UnloadIndex());
176       }
177     else if ( old_prio > priority )
178       {
179         p->SetPriority(priority);
180         mUnloadQueue.upsort(p->UnloadIndex());
181      }
182   }
183   //=====================================================================
184
185   //=====================================================================
186   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
187                                         const std::string& filename, 
188                                         int priority )
189   {
190     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
191     
192     if (mThreadedImageReaderList.size()==0) 
193       {
194         // no detached reader : use self reader
195         ImageToLoad itl(user,filename);
196         ImageMapType::iterator i = mImages.find(&itl);
197         if (i!=mImages.end())
198           {
199             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
200             // Already inserted
201             if (pitl->GetImage() != 0)
202               {
203                 // Already read
204                 pitl->SetUser(user);
205                 UpdateUnloadPriority(pitl,priority);
206                 SignalImageRead(pitl,false);
207                 return; // pitl->GetImage();
208               }
209           }
210         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
211         mImages[pitl] = 0;
212         pitl->SetImage(mReader->ReadImage(filename));
213         UpdateUnloadPriority(pitl,priority);
214         SignalImageRead(pitl,true);
215         //      return pitl->GetImage();
216         return;
217       }
218
219
220     ImageToLoad itl(user,filename);
221     ImageMapType::iterator i = mImages.find(&itl);
222     if (i!=mImages.end())
223       {
224         // Already inserted
225         if (i->first->GetImage() != 0)
226           {
227             // Already read : ok :signal the user
228             UpdateUnloadPriority(i->first,priority);
229             SignalImageRead(i->first,false);
230             return;
231           }
232         /// Already requested : change the priority
233         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
234         pitl->SetPriority(priority);
235         // Already in queue
236         if (pitl->Index()>=0) 
237           {
238             // Re-sort the queue
239             mQueue.upsort(pitl->Index());
240           }
241         // Not read but not in queue = being read = ok
242         else 
243           {
244             
245           }
246       }
247     else 
248       {
249         // Never requested before or unloaded 
250         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
251         mImages[pitl] = 0;
252         mQueue.insert(pitl);
253       }
254   }
255   //=====================================================================
256
257   //=====================================================================
258   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
259   (const std::string& filename,
260    MultiThreadImageReaderUser::EventType e,
261    vtkImageData* image)
262   {
263     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
264         (filename == mRequestedFilename))
265       {
266         mRequestedImage = image;
267       }
268     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
269       {
270         mNumberOfThreadedReadersRunning++;
271         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
272       }
273     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
274       {
275         mNumberOfThreadedReadersRunning--;
276         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
277       }
278   }
279   //=====================================================================
280
281   //=====================================================================
282   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
283   {
284     
285     //    std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
286     //        <<std::endl;
287     
288     do 
289       {
290         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
291                 
292         if (mThreadedImageReaderList.size()==0)
293           {
294             ImageToLoad itl(this,filename);
295             ImageMapType::iterator i = mImages.find(&itl);
296             if (i!=mImages.end())
297               {
298                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
299                 // Already inserted
300                 if (pitl->GetImage() != 0)
301                   {
302                     // Already read
303                     UpdateUnloadPriority(pitl,
304                                          GetMaximalPriorityWithoutLocking()+1);
305                     return pitl->GetImage();
306                   }
307               }
308             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
309             mImages[pitl] = 0;
310             pitl->SetImage(mReader->ReadImage(filename));
311             UpdateUnloadPriority(pitl,
312                                  GetMaximalPriorityWithoutLocking()+1);
313             return pitl->GetImage();
314           }
315         
316         mRequestedFilename = filename;
317         mRequestedImage = 0;
318         ImageToLoad itl(this,filename);
319         ImageMapType::iterator i = mImages.find(&itl);
320         if (i!=mImages.end())
321           {
322             // Already inserted in queue
323             if (i->first->GetImage() != 0)
324               {
325                 // Already read : ok : return it 
326                 return i->first->GetImage();
327               }
328             /// Already requested : change the priority
329               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
330               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
331               pitl->SetUser( this );
332               // Already in queue
333               if (pitl->Index()>=0) 
334                 {
335                   // Re-sort the queue
336                   mQueue.upsort(pitl->Index());
337                 }
338               // Not read but not in queue = being read = ok
339               else 
340                 {
341                   pitl->SetUser( this );
342                 }
343           }
344         else 
345           {
346             
347             // Never requested before or unloaded 
348             ImageToLoadPtr pitl = 
349               new ImageToLoad(this,filename,
350                               GetMaximalPriorityWithoutLocking() + 1);
351             mImages[pitl] = 0;
352             mQueue.insert(pitl);
353           }
354       }
355     while (0);
356
357     //    std::cout << "Waiting..."<<std::endl;
358
359     // Waiting that it is read
360     int n = 0;
361     do 
362       {
363         //      std::cout << n++ << std::endl;
364         wxMilliSleep(10);
365         do 
366           {
367             //      wxMutexLocker lock(mMutex);
368             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
369             if (mRequestedImage!=0) 
370               {
371                 return mRequestedImage;
372               } 
373           }
374         while (0);
375       }
376     while (true);
377     // 
378     
379   }
380   //=====================================================================
381   
382   //=====================================================================
383   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
384                                                bool purge)
385   {
386     
387 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
388     //    std::cout << "this="<<this <<std::endl;
389     //    std::cout << "user="<<p->GetUser() <<std::endl;
390
391     if ( p->GetUser() == this ) 
392       GetMultiThreadImageReaderUserMutex().Unlock();
393
394     p->GetUser()->MultiThreadImageReaderSendEvent
395       (p->GetFilename(),
396        MultiThreadImageReaderUser::ImageLoaded,
397        p->GetImage());
398
399     /*
400       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
401       BUGGY : TO FIX 
402     */
403     if (!purge)  return;
404     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
405
406     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
407            
408     mUnloadQueue.insert(p);
409     p->GetImage()->UpdateInformation();
410     p->GetImage()->PropagateUpdateExtent();
411     long ImMem = p->GetImage()->GetEstimatedMemorySize();
412     mTotalMem += ImMem;
413
414     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
415     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
416
417     //  return;
418
419     while (mTotalMem > mTotalMemMax)
420       {
421         GimmickMessage(5,
422                        "   ! Exceeded max of "
423                        << mTotalMemMax << " Ko : unloading oldest image ... "
424                        << std::endl);
425         if ( mUnloadQueue.size() <= 1 ) 
426           {
427              GimmickMessage(5,
428                             "   Only one image : cannot load AND unload it !!"
429                             <<std::endl);
430             break; 
431             
432           }
433         ImageToLoadPtr unload = mUnloadQueue.remove_top();
434         MultiThreadImageReaderUser* user = unload->GetUser();
435
436         if ((user!=0)&&(user!=this)) 
437           {
438             user->GetMultiThreadImageReaderUserMutex().Lock();
439           }
440
441
442         GimmickMessage(5,"'" << unload->GetFilename() << "'" << std::endl);
443         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
444
445         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
446         
447
448         std::string filename = unload->GetFilename();
449         if (unload->Index()>=0)
450           {
451             // GimmickMessage(5,"still in queue"<<std::endl);
452           }
453         unload->Index() = -1;
454
455
456         ImageMapType::iterator it = mImages.find(unload);
457         if (it!=mImages.end())
458           {
459             mImages.erase(it);
460           }
461         //          std::cout << "delete..."<<std::endl;
462         delete unload;
463         //          std::cout << "delete ok."<<std::endl;
464
465         if (user!=0) 
466           {
467             //      std::cout << "unlock..."<<std::endl;
468             user->GetMultiThreadImageReaderUserMutex().Unlock();
469             //      std::cout << "event"<<std::endl;
470             user->MultiThreadImageReaderSendEvent
471               (filename,
472                MultiThreadImageReaderUser::ImageUnloaded,
473                0);
474             //      std::cout << "event ok"<<std::endl;
475
476           }
477       }
478     
479   
480   }
481   //=====================================================================
482
483   //=====================================================================
484   int MultiThreadImageReader::GetMaximalPriority()
485   { 
486     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
487     return GetMaximalPriorityWithoutLocking();
488   }
489   //=====================================================================
490
491
492   //=====================================================================
493   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
494   { 
495     long max = 0;
496     if (mQueue.size()>0) 
497       {
498         max = mQueue.top()->GetPriority();
499       }
500     if (mUnloadQueue.size()>0)
501       {
502         int max2 = mUnloadQueue.top()->GetPriority();
503         if (max2>max) max=max2;
504       }
505     return max;
506   }
507   //=====================================================================
508
509
510   //=====================================================================
511   //=====================================================================
512   //=====================================================================
513   //=====================================================================
514
515   //=====================================================================
516   void*  ThreadedImageReader::Entry()
517   {
518     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
519     //                << std::endl;
520
521     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
522       ("",
523        MultiThreadImageReaderUser::ThreadedReaderStarted,
524        0);
525
526     // While was not deleted 
527     while (!TestDestroy())
528       {
529         //      std::cout << "### Thread "<<GetCurrentId()<<" waiting for image"
530         //        << std::endl;
531           
532         // Lock the mutex
533         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
534         //mMutex.Lock();
535         // If image in queue
536         if (mMultiThreadImageReader->mQueue.size()>0)
537           {
538             MultiThreadImageReader::ImageToLoadPtr i = 
539               mMultiThreadImageReader->mQueue.remove_top();
540
541             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
542             //mMutex.Unlock();
543
544             
545             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
546             //                << i->GetFilename() << "'" << std::endl;
547             
548             // Do the job
549             vtkImageData* im = Read(i->GetFilename());
550
551             // Store it in the map
552             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
553             //mMutex.Lock();
554             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
555             MultiThreadImageReader::ImageMapType::iterator it = 
556               mMultiThreadImageReader->mImages.find(&itl);
557             MultiThreadImageReader::ImageToLoadPtr 
558               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
559               (it->first);
560             pitl->SetImage(im);
561             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
562             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
563             
564             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
565             //                << i->GetFilename() << "' : DONE" << std::endl;
566             
567           }
568         else 
569           {
570             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
571             //mMutex.Unlock();
572             // Wait a little to avoid blocking 
573             Sleep(10);
574           }
575       };
576     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
577     //                << std::endl;
578        
579     return 0;
580   }
581   //=====================================================================
582
583   //=====================================================================
584   void ThreadedImageReader::OnExit()
585   {
586     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
587       ("",
588        MultiThreadImageReaderUser::ThreadedReaderStopped,
589        0);
590   }
591   //=====================================================================
592
593   //=====================================================================
594   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
595   {
596     return mReader.ReadImage(filename);
597   }
598   //=====================================================================
599
600 } // namespace creaImageIO