]> Creatis software - creaImageIO.git/blob - src2/creaImageIOMultiThreadImageReader.cpp
*** empty log message ***
[creaImageIO.git] / src2 / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6 namespace creaImageIO
7 {
8
9   //=====================================================================
10   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
11   ( const std::string& filename,
12     EventType type,
13     vtkImageData* image)
14   {
15     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
16     this->OnMultiThreadImageReaderEvent(filename,type,image);
17   }
18   //=====================================================================
19
20   //=====================================================================
21   class ThreadedImageReader: public wxThread
22   {
23   public:
24     ThreadedImageReader(MultiThreadImageReader* tir) :
25       mMultiThreadImageReader(tir)
26     {}
27
28     void* Entry();
29     void  OnExit();
30
31     vtkImageData* Read(const std::string& filename);
32     
33
34   private:
35     ImageReader mReader;
36     MultiThreadImageReader* mMultiThreadImageReader;
37   };
38   //=====================================================================
39
40   
41   //=====================================================================
42   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
43     : //mDoNotSignal(false),
44       mReader(0),
45       mTotalMem(0),
46       mTotalMemMax(1500000)
47   {
48     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
49     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
50
51     // Create the threads
52     for (int i=0; i<number_of_threads; i++) 
53       {
54         ThreadedImageReader* t = new ThreadedImageReader(this);
55         mThreadedImageReaderList.push_back(t);
56       }
57     mNumberOfThreadedReadersRunning = 0;
58     // Init the queue
59     mQueue.set(mComparator);
60     mQueue.set(mIndexer);
61     // 
62     // no thread : alloc self reader
63 //    if (number_of_threads==0)
64 //      {
65         mReader = new ImageReader();
66 //      }
67   }
68   //=====================================================================
69
70
71   //=====================================================================
72   bool MultiThreadImageReader::Start()
73   {
74
75     //    std::cout << "#### MultiThreadImageReader::Start()"
76     //                <<std::endl;
77           if (mNumberOfThreadedReadersRunning > 0) return true;
78           
79     ThreadedImageReaderListType::iterator i;
80     for (i =mThreadedImageReaderList.begin();
81          i!=mThreadedImageReaderList.end();
82          i++)
83       {
84         (*i)->Create();
85         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
86           {
87             std::cout << "ERROR starting a thread"<< std::endl;
88             return false;
89           }
90         else 
91           {
92             //      std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
93             //                <<" successfully created"<< std::endl;
94             
95           }
96       }
97     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
98     //    std::cout << "EO Start : #Threads running = "
99     //                << mNumberOfThreadedReadersRunning<<std::endl;
100
101     return true;
102   }
103   //=====================================================================
104
105   //=====================================================================
106   void MultiThreadImageReader::Stop()
107   { 
108 //                  std::cout << "#### MultiThreadImageReader::Stop()"
109 //            <<std::endl;
110   //  std::cout << "Sending stop order to the threads..."<<std::endl;
111
112     ThreadedImageReaderListType::iterator i;
113     for (i =mThreadedImageReaderList.begin();
114          i!=mThreadedImageReaderList.end();
115          i++)
116       {
117         (*i)->Delete();
118       }
119     mThreadedImageReaderList.clear();
120     // Wait a little to be sure that all threads have stopped
121     // A better way to do this ?
122     //    wxMilliSleep(1000);
123     // New method : the threads generate a stop event when they have finished
124     // We wait until all threads have stopped
125 //        std::cout << "Waiting for stop signals..."<<std::endl;
126     do 
127       {
128         // Sleep a little
129         wxMilliSleep(10);
130         // Lock
131         {
132           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
133 //                std::cout << "#Threads running = "
134 //                          << mNumberOfThreadedReadersRunning<<std::endl;
135           // Break if all readers have stopped
136           if (mNumberOfThreadedReadersRunning <= 0) 
137             {
138               break;
139             }
140         }
141       } 
142     while (true);
143 //        std::cout << "All threads stopped : OK "<<std::endl;
144
145     ImageMapType::iterator j;
146     for (j =mImages.begin();
147          j!=mImages.end();
148          ++j)
149
150       {
151         delete j->first;
152       }
153     mImages.clear();
154   }
155   //=====================================================================
156
157   //=====================================================================
158   MultiThreadImageReader::~MultiThreadImageReader()
159   {
160     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
161     //        <<std::endl;
162     Stop();
163     if (mReader) delete mReader;
164   }
165   //=====================================================================
166
167   //=====================================================================
168   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
169                                                     int priority)
170   {
171     // not in unload queue : ciao
172     if (p->UnloadIndex()<0) return;
173     int old_prio = p->GetPriority();
174     if (priority > old_prio) 
175       {
176         p->SetPriority(priority);
177         mUnloadQueue.downsort(p->UnloadIndex());
178       }
179     else if ( old_prio > priority )
180       {
181         p->SetPriority(priority);
182         mUnloadQueue.upsort(p->UnloadIndex());
183      }
184   }
185   //=====================================================================
186
187   //=====================================================================
188   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
189                                         const std::string& filename, 
190                                         int priority )
191   {
192         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
193     
194           if (mNumberOfThreadedReadersRunning==0)
195 //    if (mThreadedImageReaderList.size()==0) 
196       {
197         // no detached reader : use self reader
198         ImageToLoad itl(user,filename);
199         ImageMapType::iterator i = mImages.find(&itl);
200         if (i!=mImages.end())
201           {
202             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
203             // Already inserted
204             if (pitl->GetImage() != 0)
205               {
206                 // Already read
207                 pitl->SetUser(user);
208                 UpdateUnloadPriority(pitl,priority);
209                 SignalImageRead(pitl,false);
210                 return; // pitl->GetImage();
211               }
212           }
213         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
214         mImages[pitl] = 0;
215         pitl->SetImage(mReader->ReadImage(filename));
216         UpdateUnloadPriority(pitl,priority);
217         SignalImageRead(pitl,true);
218         //      return pitl->GetImage();
219         return;
220       }
221
222
223     ImageToLoad itl(user,filename);
224     ImageMapType::iterator i = mImages.find(&itl);
225     if (i!=mImages.end())
226       {
227         // Already inserted
228         if (i->first->GetImage() != 0)
229           {
230             // Already read : ok :signal the user
231             UpdateUnloadPriority(i->first,priority);
232             SignalImageRead(i->first,false);
233             return;
234           }
235         /// Already requested : change the priority
236         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
237         pitl->SetPriority(priority);
238         // Already in queue
239         if (pitl->Index()>=0) 
240           {
241             // Re-sort the queue
242             mQueue.upsort(pitl->Index());
243           }
244         // Not read but not in queue = being read = ok
245         else 
246           {
247             
248           }
249       }
250     else 
251       {
252         // Never requested before or unloaded 
253         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
254         mImages[pitl] = 0;
255         mQueue.insert(pitl);
256       }
257   }
258   //=====================================================================
259
260   //=====================================================================
261   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
262   (const std::string& filename,
263    MultiThreadImageReaderUser::EventType e,
264    vtkImageData* image)
265   {
266     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
267         (filename == mRequestedFilename))
268       {
269         mRequestedImage = image;
270       }
271     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
272       {
273         mNumberOfThreadedReadersRunning++;
274         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
275       }
276     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
277       {
278         mNumberOfThreadedReadersRunning--;
279         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
280       }
281   }
282   //=====================================================================
283
284   //=====================================================================
285   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
286   {
287          // Start();
288     //    std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
289     //        <<std::endl;
290     
291     do 
292       {
293         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
294                 
295                   if (mNumberOfThreadedReadersRunning==0)
296 //      if (mThreadedImageReaderList.size()==0)
297           {
298             ImageToLoad itl(this,filename);
299             ImageMapType::iterator i = mImages.find(&itl);
300             if (i!=mImages.end())
301               {
302                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
303                 // Already inserted
304                 if (pitl->GetImage() != 0)
305                   {
306                     // Already read
307                     UpdateUnloadPriority(pitl,
308                                          GetMaximalPriorityWithoutLocking()+1);
309                     return pitl->GetImage();
310                   }
311               }
312             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
313             mImages[pitl] = 0;
314             pitl->SetImage(mReader->ReadImage(filename));
315             UpdateUnloadPriority(pitl,
316                                  GetMaximalPriorityWithoutLocking()+1);
317             return pitl->GetImage();
318           }
319         
320         mRequestedFilename = filename;
321         mRequestedImage = 0;
322         ImageToLoad itl(this,filename);
323         ImageMapType::iterator i = mImages.find(&itl);
324         if (i!=mImages.end())
325           {
326             // Already inserted in queue
327             if (i->first->GetImage() != 0)
328               {
329                 // Already read : ok : return it 
330                 return i->first->GetImage();
331               }
332             /// Already requested : change the priority
333               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
334               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
335               pitl->SetUser( this );
336               // Already in queue
337               if (pitl->Index()>=0) 
338                 {
339                   // Re-sort the queue
340                   mQueue.upsort(pitl->Index());
341                 }
342               // Not read but not in queue = being read = ok
343               else 
344                 {
345                   pitl->SetUser( this );
346                 }
347           }
348         else 
349           {
350             
351             // Never requested before or unloaded 
352             ImageToLoadPtr pitl = 
353               new ImageToLoad(this,filename,
354                               GetMaximalPriorityWithoutLocking() + 1);
355             mImages[pitl] = 0;
356             mQueue.insert(pitl);
357           }
358       }
359     while (0);
360
361     //    std::cout << "Waiting..."<<std::endl;
362
363     // Waiting that it is read
364     int n = 0;
365     do 
366       {
367         //      std::cout << n++ << std::endl;
368         wxMilliSleep(10);
369         do 
370           {
371             //      wxMutexLocker lock(mMutex);
372             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
373             if (mRequestedImage!=0) 
374               {
375                 return mRequestedImage;
376               } 
377           }
378         while (0);
379       }
380     while (true);
381     // 
382     
383   }
384   //=====================================================================
385   
386   //=====================================================================
387   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
388                                                bool purge)
389   {
390     
391 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
392     //    std::cout << "this="<<this <<std::endl;
393     //    std::cout << "user="<<p->GetUser() <<std::endl;
394
395     if ( p->GetUser() == this ) 
396       GetMultiThreadImageReaderUserMutex().Unlock();
397
398     p->GetUser()->MultiThreadImageReaderSendEvent
399       (p->GetFilename(),
400        MultiThreadImageReaderUser::ImageLoaded,
401        p->GetImage());
402
403     /*
404       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
405       BUGGY : TO FIX 
406     */
407     if (!purge)  return;
408     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
409
410     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
411            
412     mUnloadQueue.insert(p);
413     p->GetImage()->UpdateInformation();
414     p->GetImage()->PropagateUpdateExtent();
415     long ImMem = p->GetImage()->GetEstimatedMemorySize();
416     mTotalMem += ImMem;
417
418     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
419     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
420
421     //  return;
422
423     while (mTotalMem > mTotalMemMax)
424       {
425         GimmickMessage(5,
426                        "   ! Exceeded max of "
427                        << mTotalMemMax << " Ko : unloading oldest image ... "
428                        << std::endl);
429         if ( mUnloadQueue.size() <= 1 ) 
430           {
431              GimmickMessage(5,
432                             "   Only one image : cannot load AND unload it !!"
433                             <<std::endl);
434             break; 
435             
436           }
437         ImageToLoadPtr unload = mUnloadQueue.remove_top();
438         MultiThreadImageReaderUser* user = unload->GetUser();
439
440         /*
441         if ((user!=0)&&(user!=this)) 
442           {
443             user->GetMultiThreadImageReaderUserMutex().Lock();
444           }
445         */
446
447         std::string filename = unload->GetFilename();
448
449         GimmickMessage(5,"'" << filename << "'" << std::endl);
450         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
451
452         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
453
454
455         if (user!=0) 
456           {
457             //      std::cout << "unlock..."<<std::endl;
458             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
459             //      std::cout << "event"<<std::endl;
460             user->MultiThreadImageReaderSendEvent
461               (filename,
462                MultiThreadImageReaderUser::ImageUnloaded,
463                0);
464             //      std::cout << "event ok"<<std::endl;
465
466           }     
467
468         if (unload->Index()>=0)
469           {
470             // GimmickMessage(5,"still in queue"<<std::endl);
471           }
472         unload->Index() = -1;
473
474
475         ImageMapType::iterator it = mImages.find(unload);
476         if (it!=mImages.end())
477           {
478             mImages.erase(it);
479           }
480         //          std::cout << "delete..."<<std::endl;
481         delete unload;
482         //          std::cout << "delete ok."<<std::endl;
483
484       }
485     
486   
487   }
488   //=====================================================================
489
490   //=====================================================================
491   int MultiThreadImageReader::GetMaximalPriority()
492   { 
493     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
494     return GetMaximalPriorityWithoutLocking();
495   }
496   //=====================================================================
497
498
499   //=====================================================================
500   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
501   { 
502     long max = 0;
503     if (mQueue.size()>0) 
504       {
505         max = mQueue.top()->GetPriority();
506       }
507     if (mUnloadQueue.size()>0)
508       {
509         int max2 = mUnloadQueue.top()->GetPriority();
510         if (max2>max) max=max2;
511       }
512     return max;
513   }
514   //=====================================================================
515
516
517   //=====================================================================
518   //=====================================================================
519   //=====================================================================
520   //=====================================================================
521
522   //=====================================================================
523   void*  ThreadedImageReader::Entry()
524   {
525     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
526     //                << std::endl;
527
528     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
529       ("",
530        MultiThreadImageReaderUser::ThreadedReaderStarted,
531        0);
532
533     // While was not deleted 
534     while (!TestDestroy())
535       {
536         //      std::cout << "### Thread "<<GetCurrentId()<<" waiting for image"
537         //        << std::endl;
538           
539         // Lock the mutex
540         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
541         //mMutex.Lock();
542         // If image in queue
543         if (mMultiThreadImageReader->mQueue.size()>0)
544           {
545             MultiThreadImageReader::ImageToLoadPtr i = 
546               mMultiThreadImageReader->mQueue.remove_top();
547
548             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
549             //mMutex.Unlock();
550
551             
552             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
553             //                << i->GetFilename() << "'" << std::endl;
554             
555             // Do the job
556             vtkImageData* im = Read(i->GetFilename());
557
558             // Store it in the map
559             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
560             //mMutex.Lock();
561             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
562             MultiThreadImageReader::ImageMapType::iterator it = 
563               mMultiThreadImageReader->mImages.find(&itl);
564             MultiThreadImageReader::ImageToLoadPtr 
565               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
566               (it->first);
567             pitl->SetImage(im);
568             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
569             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
570             
571             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
572             //                << i->GetFilename() << "' : DONE" << std::endl;
573             
574           }
575         else 
576           {
577             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
578             //mMutex.Unlock();
579             // Wait a little to avoid blocking 
580             Sleep(10);
581           }
582       };
583     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
584     //                << std::endl;
585        
586     return 0;
587   }
588   //=====================================================================
589
590   //=====================================================================
591   void ThreadedImageReader::OnExit()
592   {
593     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
594       ("",
595        MultiThreadImageReaderUser::ThreadedReaderStopped,
596        0);
597   }
598   //=====================================================================
599
600   //=====================================================================
601   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
602   {
603     return mReader.ReadImage(filename);
604   }
605   //=====================================================================
606
607 } // namespace creaImageIO