]> Creatis software - creaImageIO.git/blob - src/creaImageIOMultiThreadImageReader.cpp
Initial revision
[creaImageIO.git] / src / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4
5 namespace creaImageIO
6 {
7
8   //=====================================================================
9   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
10   ( const std::string& filename,
11     EventType type,
12     vtkImageData* image)
13   {
14     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
15     this->OnMultiThreadImageReaderEvent(filename,type,image);
16   }
17   //=====================================================================
18
19   //=====================================================================
20   class ThreadedImageReader: public wxThread
21   {
22   public:
23     ThreadedImageReader(MultiThreadImageReader* tir) :
24       mMultiThreadImageReader(tir)
25     {}
26
27     void* Entry();
28     void  OnExit();
29
30     vtkImageData* Read(const std::string& filename);
31     
32
33   private:
34     ImageReader mReader;
35     MultiThreadImageReader* mMultiThreadImageReader;
36   };
37   //=====================================================================
38
39   
40   //=====================================================================
41   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
42     : //mDoNotSignal(false),
43       mReader(0),
44       mTotalMem(0),
45       mTotalMemMax(10000)
46   {
47     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
48     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
49
50     // Create the threads
51     for (int i=0; i<number_of_threads; i++) 
52       {
53         ThreadedImageReader* t = new ThreadedImageReader(this);
54         mThreadedImageReaderList.push_back(t);
55       }
56     mNumberOfThreadedReadersRunning = 0;
57     // Init the queue
58     mQueue.set(mComparator);
59     mQueue.set(mIndexer);
60     // 
61     // no thread : alloc self reader
62     if (number_of_threads==0)
63       {
64         mReader = new ImageReader();
65       }
66   }
67   //=====================================================================
68
69
70   //=====================================================================
71   bool MultiThreadImageReader::Start()
72   {
73
74     //    std::cout << "#### MultiThreadImageReader::Start()"
75     //                <<std::endl;
76     ThreadedImageReaderListType::iterator i;
77     for (i =mThreadedImageReaderList.begin();
78          i!=mThreadedImageReaderList.end();
79          i++)
80       {
81         (*i)->Create();
82         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
83           {
84             std::cout << "ERROR starting a thread"<< std::endl;
85             return false;
86           }
87         else 
88           {
89             //      std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
90             //                <<" successfully created"<< std::endl;
91             
92           }
93       }
94     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
95     //    std::cout << "EO Start : #Threads running = "
96     //                << mNumberOfThreadedReadersRunning<<std::endl;
97
98     return true;
99   }
100   //=====================================================================
101
102   //=====================================================================
103   void MultiThreadImageReader::Stop()
104   { 
105 //                  std::cout << "#### MultiThreadImageReader::Stop()"
106 //            <<std::endl;
107   //  std::cout << "Sending stop order to the threads..."<<std::endl;
108
109     ThreadedImageReaderListType::iterator i;
110     for (i =mThreadedImageReaderList.begin();
111          i!=mThreadedImageReaderList.end();
112          i++)
113       {
114         (*i)->Delete();
115       }
116     mThreadedImageReaderList.clear();
117     // Wait a little to be sure that all threads have stopped
118     // A better way to do this ?
119     //    wxMilliSleep(1000);
120     // New method : the threads generate a stop event when they have finished
121     // We wait until all threads have stopped
122 //        std::cout << "Waiting for stop signals..."<<std::endl;
123     do 
124       {
125         // Sleep a little
126         wxMilliSleep(10);
127         // Lock
128         {
129           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
130 //                std::cout << "#Threads running = "
131 //                          << mNumberOfThreadedReadersRunning<<std::endl;
132           // Break if all readers have stopped
133           if (mNumberOfThreadedReadersRunning <= 0) 
134             {
135               break;
136             }
137         }
138       } 
139     while (true);
140 //        std::cout << "All threads stopped : OK "<<std::endl;
141
142     ImageMapType::iterator j;
143     for (j =mImages.begin();
144          j!=mImages.end();
145          ++j)
146
147       {
148         delete j->first;
149       }
150     mImages.clear();
151   }
152   //=====================================================================
153
154   //=====================================================================
155   MultiThreadImageReader::~MultiThreadImageReader()
156   {
157     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
158     //        <<std::endl;
159     Stop();
160     if (mReader) delete mReader;
161   }
162   //=====================================================================
163
164   //=====================================================================
165   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
166                                                     int priority)
167   {
168     // not in unload queue : ciao
169     if (p->UnloadIndex()<0) return;
170     int old_prio = p->GetPriority();
171     if (priority > old_prio) 
172       {
173         p->SetPriority(priority);
174         mUnloadQueue.downsort(p->UnloadIndex());
175       }
176     else if ( old_prio > priority )
177       {
178         p->SetPriority(priority);
179         mUnloadQueue.upsort(p->UnloadIndex());
180      }
181   }
182   //=====================================================================
183
184   //=====================================================================
185   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
186                                         const std::string& filename, 
187                                         int priority )
188   {
189     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
190     
191     if (mThreadedImageReaderList.size()==0) 
192       {
193         // no detached reader : use self reader
194         ImageToLoad itl(user,filename);
195         ImageMapType::iterator i = mImages.find(&itl);
196         if (i!=mImages.end())
197           {
198             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
199             // Already inserted
200             if (pitl->GetImage() != 0)
201               {
202                 // Already read
203                 pitl->SetUser(user);
204                 UpdateUnloadPriority(pitl,priority);
205                 SignalImageRead(pitl,false);
206                 return; // pitl->GetImage();
207               }
208           }
209         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
210         mImages[pitl] = 0;
211         pitl->SetImage(mReader->Read(filename));
212         UpdateUnloadPriority(pitl,priority);
213         SignalImageRead(pitl,true);
214         //      return pitl->GetImage();
215         return;
216       }
217
218
219     ImageToLoad itl(user,filename);
220     ImageMapType::iterator i = mImages.find(&itl);
221     if (i!=mImages.end())
222       {
223         // Already inserted
224         if (i->first->GetImage() != 0)
225           {
226             // Already read : ok :signal the user
227             UpdateUnloadPriority(i->first,priority);
228             SignalImageRead(i->first,false);
229             return;
230           }
231         /// Already requested : change the priority
232         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
233         pitl->SetPriority(priority);
234         // Already in queue
235         if (pitl->Index()>=0) 
236           {
237             // Re-sort the queue
238             mQueue.upsort(pitl->Index());
239           }
240         // Not read but not in queue = being read = ok
241         else 
242           {
243             
244           }
245       }
246     else 
247       {
248         // Never requested before or unloaded 
249         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
250         mImages[pitl] = 0;
251         mQueue.insert(pitl);
252       }
253   }
254   //=====================================================================
255
256   //=====================================================================
257   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
258   (const std::string& filename,
259    MultiThreadImageReaderUser::EventType e,
260    vtkImageData* image)
261   {
262     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
263         (filename == mRequestedFilename))
264       {
265         mRequestedImage = image;
266       }
267     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
268       {
269         mNumberOfThreadedReadersRunning++;
270         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
271       }
272     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
273       {
274         mNumberOfThreadedReadersRunning--;
275         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
276       }
277   }
278   //=====================================================================
279
280   //=====================================================================
281   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
282   {
283     
284     //    std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
285     //        <<std::endl;
286     
287     do 
288       {
289         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
290                 
291         if (mThreadedImageReaderList.size()==0)
292           {
293             ImageToLoad itl(this,filename);
294             ImageMapType::iterator i = mImages.find(&itl);
295             if (i!=mImages.end())
296               {
297                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
298                 // Already inserted
299                 if (pitl->GetImage() != 0)
300                   {
301                     // Already read
302                     UpdateUnloadPriority(pitl,
303                                          GetMaximalPriorityWithoutLocking()+1);
304                     return pitl->GetImage();
305                   }
306               }
307             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
308             mImages[pitl] = 0;
309             pitl->SetImage(mReader->Read(filename));
310             UpdateUnloadPriority(pitl,
311                                  GetMaximalPriorityWithoutLocking()+1);
312             return pitl->GetImage();
313           }
314         
315         mRequestedFilename = filename;
316         mRequestedImage = 0;
317         ImageToLoad itl(this,filename);
318         ImageMapType::iterator i = mImages.find(&itl);
319         if (i!=mImages.end())
320           {
321             // Already inserted in queue
322             if (i->first->GetImage() != 0)
323               {
324                 // Already read : ok : return it 
325                 return i->first->GetImage();
326               }
327             /// Already requested : change the priority
328               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
329               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
330               pitl->SetUser( this );
331               // Already in queue
332               if (pitl->Index()>=0) 
333                 {
334                   // Re-sort the queue
335                   mQueue.upsort(pitl->Index());
336                 }
337               // Not read but not in queue = being read = ok
338               else 
339                 {
340                   pitl->SetUser( this );
341                 }
342           }
343         else 
344           {
345             
346             // Never requested before or unloaded 
347             ImageToLoadPtr pitl = 
348               new ImageToLoad(this,filename,
349                               GetMaximalPriorityWithoutLocking() + 1);
350             mImages[pitl] = 0;
351             mQueue.insert(pitl);
352           }
353       }
354     while (0);
355
356     //    std::cout << "Waiting..."<<std::endl;
357
358     // Waiting that it is read
359     int n = 0;
360     do 
361       {
362         //      std::cout << n++ << std::endl;
363         wxMilliSleep(10);
364         do 
365           {
366             //      wxMutexLocker lock(mMutex);
367             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
368             if (mRequestedImage!=0) 
369               {
370                 return mRequestedImage;
371               } 
372           }
373         while (0);
374       }
375     while (true);
376     // 
377     
378   }
379   //=====================================================================
380   
381   //=====================================================================
382   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
383                                                bool purge)
384   {
385     //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
386     //    std::cout << "this="<<this <<std::endl;
387     //    std::cout << "user="<<p->GetUser() <<std::endl;
388
389     if ( p->GetUser() == this ) 
390       GetMultiThreadImageReaderUserMutex().Unlock();
391
392     p->GetUser()->MultiThreadImageReaderSendEvent
393       (p->GetFilename(),
394        MultiThreadImageReaderUser::ImageLoaded,
395        p->GetImage());
396
397     /*
398       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
399       BUGGY : TO FIX 
400     */
401     if (!purge)  return;
402
403     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
404            
405     mUnloadQueue.insert(p);
406     p->GetImage()->UpdateInformation();
407     p->GetImage()->PropagateUpdateExtent();
408     long ImMem = p->GetImage()->GetEstimatedMemorySize();
409     mTotalMem += ImMem;
410     //    std::cout << " ==> Total mem = "<<mTotalMem<<" Ko"<<std::endl;
411     while (mTotalMem > mTotalMemMax)
412       {
413         //      std::cout 
414         //                <<"   ! Exceeded max of "
415         //                << mTotalMemMax << " : unloading oldest image ... "
416         //                << std::endl;
417         if ( mUnloadQueue.size() <= 1 ) 
418           {
419             //      std::cout << "Only one image : cannot load AND unload it !!"
420             //                <<std::endl;
421             break; 
422
423           }
424         ImageToLoadPtr unload = mUnloadQueue.remove_top();
425         MultiThreadImageReaderUser* user = unload->GetUser();
426
427         if ((user!=0)&&(user!=this)) 
428           {
429             user->GetMultiThreadImageReaderUserMutex().Lock();
430           }
431
432
433         //      std::cout << "'" << unload->GetFilename() << "'" << std::endl;
434         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
435         //      std::cout << " ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl;
436         
437
438         std::string filename = unload->GetFilename();
439         if (unload->Index()>=0)
440           {
441             //   std::cout << "still in queue"<<std::endl;
442           }
443         unload->Index() = -1;
444
445
446         ImageMapType::iterator it = mImages.find(unload);
447         if (it!=mImages.end())
448           {
449             mImages.erase(it);
450           }
451         //          std::cout << "delete..."<<std::endl;
452         delete unload;
453         //          std::cout << "delete ok."<<std::endl;
454
455         if (user!=0) 
456           {
457             //      std::cout << "unlock..."<<std::endl;
458             user->GetMultiThreadImageReaderUserMutex().Unlock();
459             //      std::cout << "event"<<std::endl;
460             user->MultiThreadImageReaderSendEvent
461               (filename,
462                MultiThreadImageReaderUser::ImageUnloaded,
463                0);
464             //      std::cout << "event ok"<<std::endl;
465
466           }
467       }
468     
469   
470   }
471   //=====================================================================
472
473   //=====================================================================
474   int MultiThreadImageReader::GetMaximalPriority()
475   { 
476     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
477     return GetMaximalPriorityWithoutLocking();
478   }
479   //=====================================================================
480
481
482   //=====================================================================
483   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
484   { 
485     long max = 0;
486     if (mQueue.size()>0) 
487       {
488         max = mQueue.top()->GetPriority();
489       }
490     if (mUnloadQueue.size()>0)
491       {
492         int max2 = mUnloadQueue.top()->GetPriority();
493         if (max2>max) max=max2;
494       }
495     return max;
496   }
497   //=====================================================================
498
499
500   //=====================================================================
501   //=====================================================================
502   //=====================================================================
503   //=====================================================================
504
505   //=====================================================================
506   void*  ThreadedImageReader::Entry()
507   {
508     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
509     //                << std::endl;
510
511     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
512       ("",
513        MultiThreadImageReaderUser::ThreadedReaderStarted,
514        0);
515
516     // While was not deleted 
517     while (!TestDestroy())
518       {
519         //      std::cout << "### Thread "<<GetCurrentId()<<" waiting for image"
520         //        << std::endl;
521           
522         // Lock the mutex
523         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
524         //mMutex.Lock();
525         // If image in queue
526         if (mMultiThreadImageReader->mQueue.size()>0)
527           {
528             MultiThreadImageReader::ImageToLoadPtr i = 
529               mMultiThreadImageReader->mQueue.remove_top();
530
531             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
532             //mMutex.Unlock();
533
534             
535             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
536             //                << i->GetFilename() << "'" << std::endl;
537             
538             // Do the job
539             vtkImageData* im = Read(i->GetFilename());
540
541             // Store it in the map
542             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
543             //mMutex.Lock();
544             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
545             MultiThreadImageReader::ImageMapType::iterator it = 
546               mMultiThreadImageReader->mImages.find(&itl);
547             MultiThreadImageReader::ImageToLoadPtr 
548               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
549               (it->first);
550             pitl->SetImage(im);
551             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
552             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
553             
554             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
555             //                << i->GetFilename() << "' : DONE" << std::endl;
556             
557           }
558         else 
559           {
560             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
561             //mMutex.Unlock();
562             // Wait a little to avoid blocking 
563             Sleep(10);
564           }
565       };
566     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
567     //                << std::endl;
568        
569     return 0;
570   }
571   //=====================================================================
572
573   //=====================================================================
574   void ThreadedImageReader::OnExit()
575   {
576     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
577       ("",
578        MultiThreadImageReaderUser::ThreadedReaderStopped,
579        0);
580   }
581   //=====================================================================
582
583   //=====================================================================
584   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
585   {
586     return mReader.Read(filename);
587   }
588   //=====================================================================
589
590 } // namespace creaImageIO