]> Creatis software - creaImageIO.git/blob - src2/creaImageIOMultiThreadImageReader.cpp
053f77e32e0d1a6f30815f0f47eee7d8c0951590
[creaImageIO.git] / src2 / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6 namespace creaImageIO
7 {
8
9   //=====================================================================
10   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
11   ( const std::string& filename,
12     EventType type,
13     vtkImageData* image)
14   {
15     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
16     this->OnMultiThreadImageReaderEvent(filename,type,image);
17   }
18   //=====================================================================
19
20   //=====================================================================
21   class ThreadedImageReader: public wxThread
22   {
23   public:
24     ThreadedImageReader(MultiThreadImageReader* tir) :
25       mMultiThreadImageReader(tir)
26     {}
27
28     void* Entry();
29     void  OnExit();
30
31     vtkImageData* Read(const std::string& filename);
32     
33
34   private:
35     ImageReader mReader;
36     MultiThreadImageReader* mMultiThreadImageReader;
37   };
38   //=====================================================================
39
40   
41   //=====================================================================
42   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
43     : //mDoNotSignal(false),
44       mReader(0),
45       mTotalMem(0),
46       mTotalMemMax(100000)
47   {
48     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
49     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
50
51     // Create the threads
52     for (int i=0; i<number_of_threads; i++) 
53       {
54         ThreadedImageReader* t = new ThreadedImageReader(this);
55         mThreadedImageReaderList.push_back(t);
56       }
57     mNumberOfThreadedReadersRunning = 0;
58     // Init the queue
59     mQueue.set(mComparator);
60     mQueue.set(mIndexer);
61     // 
62     // no thread : alloc self reader
63 //    if (number_of_threads==0)
64 //      {
65         mReader = new ImageReader();
66 //      }
67   }
68   //=====================================================================
69
70
71   //=====================================================================
72   bool MultiThreadImageReader::Start()
73   {
74
75     //    std::cout << "#### MultiThreadImageReader::Start()"
76     //                <<std::endl;
77           if (mNumberOfThreadedReadersRunning > 0) return true;
78           
79     ThreadedImageReaderListType::iterator i;
80     for (i =mThreadedImageReaderList.begin();
81          i!=mThreadedImageReaderList.end();
82          i++)
83       {
84         (*i)->Create();
85         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
86           {
87             std::cout << "ERROR starting a thread"<< std::endl;
88             return false;
89           }
90         else 
91           {
92             //      std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
93             //                <<" successfully created"<< std::endl;
94             
95           }
96       }
97     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
98     //    std::cout << "EO Start : #Threads running = "
99     //                << mNumberOfThreadedReadersRunning<<std::endl;
100
101     return true;
102   }
103   //=====================================================================
104
105   //=====================================================================
106   void MultiThreadImageReader::Stop()
107   { 
108 //                  std::cout << "#### MultiThreadImageReader::Stop()"
109 //            <<std::endl;
110   //  std::cout << "Sending stop order to the threads..."<<std::endl;
111
112     ThreadedImageReaderListType::iterator i;
113     for (i =mThreadedImageReaderList.begin();
114          i!=mThreadedImageReaderList.end();
115          i++)
116       {
117         (*i)->Delete();
118       }
119     mThreadedImageReaderList.clear();
120     // Wait a little to be sure that all threads have stopped
121     // A better way to do this ?
122     //    wxMilliSleep(1000);
123     // New method : the threads generate a stop event when they have finished
124     // We wait until all threads have stopped
125 //        std::cout << "Waiting for stop signals..."<<std::endl;
126     do 
127       {
128         // Sleep a little
129         wxMilliSleep(10);
130         // Lock
131         {
132           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
133 //                std::cout << "#Threads running = "
134 //                          << mNumberOfThreadedReadersRunning<<std::endl;
135           // Break if all readers have stopped
136           if (mNumberOfThreadedReadersRunning <= 0) 
137             {
138               break;
139             }
140         }
141       } 
142     while (true);
143 //        std::cout << "All threads stopped : OK "<<std::endl;
144
145     ImageMapType::iterator j;
146     for (j =mImages.begin();
147          j!=mImages.end();
148          ++j)
149
150       {
151         delete j->first;
152       }
153     mImages.clear();
154   }
155   //=====================================================================
156
157   //=====================================================================
158   MultiThreadImageReader::~MultiThreadImageReader()
159   {
160     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
161     //        <<std::endl;
162     Stop();
163     if (mReader) delete mReader;
164   }
165   //=====================================================================
166
167   //=====================================================================
168   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
169                                                     int priority)
170   {
171     // not in unload queue : ciao
172     if (p->UnloadIndex()<0) return;
173     int old_prio = p->GetPriority();
174     if (priority > old_prio) 
175       {
176         p->SetPriority(priority);
177         mUnloadQueue.downsort(p->UnloadIndex());
178       }
179     else if ( old_prio > priority )
180       {
181         p->SetPriority(priority);
182         mUnloadQueue.upsort(p->UnloadIndex());
183      }
184   }
185   //=====================================================================
186
187   //=====================================================================
188   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
189                                         const std::string& filename, 
190                                         int priority )
191   {
192         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
193     
194           if (mNumberOfThreadedReadersRunning==0)
195 //    if (mThreadedImageReaderList.size()==0) 
196       {
197         // no detached reader : use self reader
198         ImageToLoad itl(user,filename);
199         ImageMapType::iterator i = mImages.find(&itl);
200         if (i!=mImages.end())
201           {
202             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
203             // Already inserted
204             if (pitl->GetImage() != 0)
205               {
206                 // Already read
207                 pitl->SetUser(user);
208                 UpdateUnloadPriority(pitl,priority);
209                 SignalImageRead(pitl,false);
210                 return; // pitl->GetImage();
211               }
212           }
213         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
214         mImages[pitl] = 0;
215         pitl->SetImage(mReader->ReadImage(filename));
216         UpdateUnloadPriority(pitl,priority);
217         SignalImageRead(pitl,true);
218         //      return pitl->GetImage();
219         return;
220       }
221
222
223     ImageToLoad itl(user,filename);
224     ImageMapType::iterator i = mImages.find(&itl);
225     if (i!=mImages.end())
226       {
227         // Already inserted
228         if (i->first->GetImage() != 0)
229           {
230             // Already read : ok :signal the user
231             UpdateUnloadPriority(i->first,priority);
232             SignalImageRead(i->first,false);
233             return;
234           }
235         /// Already requested : change the priority
236         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
237         pitl->SetPriority(priority);
238         // Already in queue
239         if (pitl->Index()>=0) 
240           {
241             // Re-sort the queue
242             mQueue.upsort(pitl->Index());
243           }
244         // Not read but not in queue = being read = ok
245         else 
246           {
247             
248           }
249       }
250     else 
251       {
252         // Never requested before or unloaded 
253         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
254         mImages[pitl] = 0;
255         mQueue.insert(pitl);
256       }
257   }
258   //=====================================================================
259
260   //=====================================================================
261   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
262   (const std::string& filename,
263    MultiThreadImageReaderUser::EventType e,
264    vtkImageData* image)
265   {
266     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
267         (filename == mRequestedFilename))
268       {
269         mRequestedImage = image;
270       }
271     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
272       {
273         mNumberOfThreadedReadersRunning++;
274         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
275       }
276     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
277       {
278         mNumberOfThreadedReadersRunning--;
279         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
280       }
281   }
282   //=====================================================================
283
284   //=====================================================================
285   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
286   {
287          // Start();
288     //    std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
289     //        <<std::endl;
290     
291     do 
292       {
293         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
294                 
295                   if (mNumberOfThreadedReadersRunning==0)
296 //      if (mThreadedImageReaderList.size()==0)
297           {
298             ImageToLoad itl(this,filename);
299             ImageMapType::iterator i = mImages.find(&itl);
300             if (i!=mImages.end())
301               {
302                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
303                 // Already inserted
304                 if (pitl->GetImage() != 0)
305                   {
306                     // Already read
307                     UpdateUnloadPriority(pitl,
308                                          GetMaximalPriorityWithoutLocking()+1);
309                     return pitl->GetImage();
310                   }
311               }
312             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
313             mImages[pitl] = 0;
314             pitl->SetImage(mReader->ReadImage(filename));
315             UpdateUnloadPriority(pitl,
316                                  GetMaximalPriorityWithoutLocking()+1);
317             return pitl->GetImage();
318           }
319         
320         mRequestedFilename = filename;
321         mRequestedImage = 0;
322         ImageToLoad itl(this,filename);
323         ImageMapType::iterator i = mImages.find(&itl);
324         if (i!=mImages.end())
325           {
326             // Already inserted in queue
327             if (i->first->GetImage() != 0)
328               {
329                 // Already read : ok : return it 
330                 return i->first->GetImage();
331               }
332             /// Already requested : change the priority
333               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
334               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
335               pitl->SetUser( this );
336               // Already in queue
337               if (pitl->Index()>=0) 
338                 {
339                   // Re-sort the queue
340                   mQueue.upsort(pitl->Index());
341                 }
342               // Not read but not in queue = being read = ok
343               else 
344                 {
345                   pitl->SetUser( this );
346                 }
347           }
348         else 
349           {
350             
351             // Never requested before or unloaded 
352             ImageToLoadPtr pitl = 
353               new ImageToLoad(this,filename,
354                               GetMaximalPriorityWithoutLocking() + 1);
355             mImages[pitl] = 0;
356             mQueue.insert(pitl);
357           }
358       }
359     while (0);
360
361     //    std::cout << "Waiting..."<<std::endl;
362
363     // Waiting that it is read
364     int n = 0;
365     do 
366       {
367         //      std::cout << n++ << std::endl;
368         wxMilliSleep(10);
369         do 
370           {
371             //      wxMutexLocker lock(mMutex);
372             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
373             if (mRequestedImage!=0) 
374               {
375                 return mRequestedImage;
376               } 
377           }
378         while (0);
379       }
380     while (true);
381     // 
382     
383   }
384   //=====================================================================
385   
386   //=====================================================================
387   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
388                                                bool purge)
389   {
390     
391 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
392     //    std::cout << "this="<<this <<std::endl;
393     //    std::cout << "user="<<p->GetUser() <<std::endl;
394
395     if ( p->GetUser() == this ) 
396       GetMultiThreadImageReaderUserMutex().Unlock();
397
398     p->GetUser()->MultiThreadImageReaderSendEvent
399       (p->GetFilename(),
400        MultiThreadImageReaderUser::ImageLoaded,
401        p->GetImage());
402
403     /*
404       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
405       BUGGY : TO FIX 
406     */
407     if (!purge)  return;
408     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
409
410     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
411            
412     mUnloadQueue.insert(p);
413     p->GetImage()->UpdateInformation();
414     p->GetImage()->PropagateUpdateExtent();
415     long ImMem = p->GetImage()->GetEstimatedMemorySize();
416     mTotalMem += ImMem;
417
418     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
419     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
420
421     //  return;
422
423     while (mTotalMem > mTotalMemMax)
424       {
425         GimmickMessage(5,
426                        "   ! Exceeded max of "
427                        << mTotalMemMax << " Ko : unloading oldest image ... "
428                        << std::endl);
429         if ( mUnloadQueue.size() <= 1 ) 
430           {
431              GimmickMessage(5,
432                             "   Only one image : cannot load AND unload it !!"
433                             <<std::endl);
434             break; 
435             
436           }
437         ImageToLoadPtr unload = mUnloadQueue.remove_top();
438         MultiThreadImageReaderUser* user = unload->GetUser();
439
440         if ((user!=0)&&(user!=this)) 
441           {
442             user->GetMultiThreadImageReaderUserMutex().Lock();
443           }
444
445
446         GimmickMessage(5,"'" << unload->GetFilename() << "'" << std::endl);
447         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
448
449         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
450         
451
452         std::string filename = unload->GetFilename();
453         if (unload->Index()>=0)
454           {
455             // GimmickMessage(5,"still in queue"<<std::endl);
456           }
457         unload->Index() = -1;
458
459
460         ImageMapType::iterator it = mImages.find(unload);
461         if (it!=mImages.end())
462           {
463             mImages.erase(it);
464           }
465         //          std::cout << "delete..."<<std::endl;
466         delete unload;
467         //          std::cout << "delete ok."<<std::endl;
468
469         if (user!=0) 
470           {
471             //      std::cout << "unlock..."<<std::endl;
472             user->GetMultiThreadImageReaderUserMutex().Unlock();
473             //      std::cout << "event"<<std::endl;
474             user->MultiThreadImageReaderSendEvent
475               (filename,
476                MultiThreadImageReaderUser::ImageUnloaded,
477                0);
478             //      std::cout << "event ok"<<std::endl;
479
480           }
481       }
482     
483   
484   }
485   //=====================================================================
486
487   //=====================================================================
488   int MultiThreadImageReader::GetMaximalPriority()
489   { 
490     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
491     return GetMaximalPriorityWithoutLocking();
492   }
493   //=====================================================================
494
495
496   //=====================================================================
497   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
498   { 
499     long max = 0;
500     if (mQueue.size()>0) 
501       {
502         max = mQueue.top()->GetPriority();
503       }
504     if (mUnloadQueue.size()>0)
505       {
506         int max2 = mUnloadQueue.top()->GetPriority();
507         if (max2>max) max=max2;
508       }
509     return max;
510   }
511   //=====================================================================
512
513
514   //=====================================================================
515   //=====================================================================
516   //=====================================================================
517   //=====================================================================
518
519   //=====================================================================
520   void*  ThreadedImageReader::Entry()
521   {
522     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
523     //                << std::endl;
524
525     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
526       ("",
527        MultiThreadImageReaderUser::ThreadedReaderStarted,
528        0);
529
530     // While was not deleted 
531     while (!TestDestroy())
532       {
533         //      std::cout << "### Thread "<<GetCurrentId()<<" waiting for image"
534         //        << std::endl;
535           
536         // Lock the mutex
537         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
538         //mMutex.Lock();
539         // If image in queue
540         if (mMultiThreadImageReader->mQueue.size()>0)
541           {
542             MultiThreadImageReader::ImageToLoadPtr i = 
543               mMultiThreadImageReader->mQueue.remove_top();
544
545             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
546             //mMutex.Unlock();
547
548             
549             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
550             //                << i->GetFilename() << "'" << std::endl;
551             
552             // Do the job
553             vtkImageData* im = Read(i->GetFilename());
554
555             // Store it in the map
556             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
557             //mMutex.Lock();
558             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
559             MultiThreadImageReader::ImageMapType::iterator it = 
560               mMultiThreadImageReader->mImages.find(&itl);
561             MultiThreadImageReader::ImageToLoadPtr 
562               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
563               (it->first);
564             pitl->SetImage(im);
565             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
566             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
567             
568             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
569             //                << i->GetFilename() << "' : DONE" << std::endl;
570             
571           }
572         else 
573           {
574             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
575             //mMutex.Unlock();
576             // Wait a little to avoid blocking 
577             Sleep(10);
578           }
579       };
580     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
581     //                << std::endl;
582        
583     return 0;
584   }
585   //=====================================================================
586
587   //=====================================================================
588   void ThreadedImageReader::OnExit()
589   {
590     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
591       ("",
592        MultiThreadImageReaderUser::ThreadedReaderStopped,
593        0);
594   }
595   //=====================================================================
596
597   //=====================================================================
598   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
599   {
600     return mReader.ReadImage(filename);
601   }
602   //=====================================================================
603
604 } // namespace creaImageIO