]> Creatis software - creaImageIO.git/blob - src2/creaImageIOMultiThreadImageReader.cpp
9645f09f163337bb7a217dc95e08de406cf57c40
[creaImageIO.git] / src2 / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6 namespace creaImageIO
7 {
8
9   //=====================================================================
10   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
11   ( const std::string& filename,
12     EventType type,
13     vtkImageData* image)
14   {
15     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
16
17     this->OnMultiThreadImageReaderEvent(filename,type,image);
18   }
19   //=====================================================================
20
21   //=====================================================================
22   class ThreadedImageReader: public wxThread
23   {
24   public:
25     ThreadedImageReader(MultiThreadImageReader* tir) :
26       mMultiThreadImageReader(tir)
27     {}
28
29     void* Entry();
30     void  OnExit();
31
32     vtkImageData* Read(const std::string& filename);
33     
34         struct deleter
35         {
36                 void operator()(ThreadedImageReader* p)
37                 {
38                         p->Delete();
39                 }
40         };
41         friend struct deleter;
42
43
44   private:
45     ImageReader mReader;
46     MultiThreadImageReader* mMultiThreadImageReader;
47         
48   };
49
50   //=====================================================================
51
52   
53   //=====================================================================
54   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
55     : //mDoNotSignal(false),
56       mReader(0),
57       mTotalMem(0),
58       mTotalMemMax(1000000)
59   {
60     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
61     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
62
63     // Create the threads
64     for (int i=0; i<number_of_threads; i++) 
65       {
66                   //ThreadedImageReader* t = new ThreadedImageReader(this);
67                   boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this), ThreadedImageReader::deleter());
68         mThreadedImageReaderList.push_back(t);
69          std::cout << "  ===> Thread "<<i
70                       <<" successfully added"<< std::endl;
71       }
72     mNumberOfThreadedReadersRunning = 0;
73     // Init the queue
74     mQueue.set(mComparator);
75     mQueue.set(mIndexer);
76     // 
77     // no thread : alloc self reader
78 //    if (number_of_threads==0)
79 //      {
80         mReader = new ImageReader();
81 //      }
82   }
83   //=====================================================================
84
85
86   //=====================================================================
87   bool MultiThreadImageReader::Start()
88   {
89
90     //    std::cout << "#### MultiThreadImageReader::Start()"
91     //                <<std::endl;
92           if (mNumberOfThreadedReadersRunning > 0) return true;
93           
94     ThreadedImageReaderListType::iterator i;
95     for (i =mThreadedImageReaderList.begin();
96          i!=mThreadedImageReaderList.end();
97          i++)
98       {
99         (*i)->Create();
100         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
101           {
102             std::cout << "ERROR starting a thread"<< std::endl;
103             return false;
104           }
105         else 
106           {
107                     std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
108                               <<" successfully created"<< std::endl;
109             
110           }
111       }
112     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
113     //    std::cout << "EO Start : #Threads running = "
114     //                << mNumberOfThreadedReadersRunning<<std::endl;
115
116     return true;
117   }
118   //=====================================================================
119
120   //=====================================================================
121   void MultiThreadImageReader::Stop()
122   { 
123 //                  std::cout << "#### MultiThreadImageReader::Stop()"
124 //            <<std::endl;
125   //  std::cout << "Sending stop order to the threads..."<<std::endl;
126
127     ThreadedImageReaderListType::iterator i;
128     for (i =mThreadedImageReaderList.begin();
129          i!=mThreadedImageReaderList.end();
130          i++)
131       { std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
132                               <<" successfully stopped"<< std::endl;
133                   if((*i)->IsAlive())
134                   {
135                           (*i).reset();
136                          //(*i)->Delete();
137                   }
138       }
139  //  mThreadedImageReaderList.clear();
140     // Wait a little to be sure that all threads have stopped
141     // A better way to do this ?
142     //    wxMilliSleep(1000);
143     // New method : the threads generate a stop event when they have finished
144     // We wait until all threads have stopped
145 //        std::cout << "Waiting for stop signals..."<<std::endl;
146     do 
147       {
148         // Sleep a little
149                 wxMilliSleep(10);
150         // Lock
151         {
152           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
153 //                std::cout << "#Threads running = "
154 //                          << mNumberOfThreadedReadersRunning<<std::endl;
155           // Break if all readers have stopped
156           if (mNumberOfThreadedReadersRunning <= 0) 
157             {
158               break;
159             }
160         }
161       } 
162     while (true);
163 //        std::cout << "All threads stopped : OK "<<std::endl;
164
165     ImageMapType::iterator j;
166     for (j =mImages.begin();
167          j!=mImages.end();
168          ++j)
169
170       {
171         delete j->first;
172       }
173     mImages.clear();
174   }
175   //=====================================================================
176
177   //=====================================================================
178   MultiThreadImageReader::~MultiThreadImageReader()
179   {
180     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
181     //        <<std::endl;
182     Stop();
183     if (mReader) delete mReader;
184         mThreadedImageReaderList.clear();
185   }
186   //=====================================================================
187
188   //=====================================================================
189   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
190                                                     int priority)
191   {
192     // not in unload queue : ciao
193     if (p->UnloadIndex()<0) return;
194     int old_prio = p->GetPriority();
195     if (priority > old_prio) 
196       {
197         p->SetPriority(priority);
198         mUnloadQueue.downsort(p->UnloadIndex());
199       }
200     else if ( old_prio > priority )
201       {
202         p->SetPriority(priority);
203         mUnloadQueue.upsort(p->UnloadIndex());
204      }
205   }
206   //=====================================================================
207
208   //=====================================================================
209   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
210                                         const std::string& filename, 
211                                         int priority )
212   {
213         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
214
215           if (mNumberOfThreadedReadersRunning==0)
216 //    if (mThreadedImageReaderList.size()==0) 
217       {
218         // no detached reader : use self reader
219         ImageToLoad itl(user,filename);
220         ImageMapType::iterator i = mImages.find(&itl);
221         if (i!=mImages.end())
222           {
223             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
224             // Already inserted
225             if (pitl->GetImage() != 0)
226               {
227                 // Already read
228                 pitl->SetUser(user);
229                 UpdateUnloadPriority(pitl,priority);
230                 SignalImageRead(pitl,false);
231                 return; // pitl->GetImage();
232               }
233           }
234         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
235         mImages[pitl] = 0;
236         pitl->SetImage(mReader->ReadImage(filename));
237         UpdateUnloadPriority(pitl,priority);
238         SignalImageRead(pitl,true);
239         //      return pitl->GetImage();
240         return;
241       }
242
243     ImageToLoad itl(user,filename);
244     ImageMapType::iterator i = mImages.find(&itl);
245     if (i!=mImages.end())
246       {
247         // Already inserted
248         if (i->first->GetImage() != 0)
249           {
250             // Already read : ok :signal the user
251             UpdateUnloadPriority(i->first,priority);
252             SignalImageRead(i->first,false);
253             return;
254           }
255         /// Already requested : change the priority
256         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
257         pitl->SetPriority(priority);
258         // Already in queue
259         if (pitl->Index()>=0) 
260           {
261             // Re-sort the queue
262             mQueue.upsort(pitl->Index());
263           }
264         // Not read but not in queue = being read = ok
265         else 
266           {
267             
268           }
269       }
270     else 
271       {
272         // Never requested before or unloaded 
273         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
274         mImages[pitl] = 0;
275         mQueue.insert(pitl);
276       }
277   }
278   //=====================================================================
279
280   //=====================================================================
281   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
282   (const std::string& filename,
283    MultiThreadImageReaderUser::EventType e,
284    vtkImageData* image)
285   {
286     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
287         (filename == mRequestedFilename))
288       {
289         mRequestedImage = image;
290       }
291     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
292       {
293         mNumberOfThreadedReadersRunning++;
294         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
295       }
296     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
297       {
298         
299                  mNumberOfThreadedReadersRunning--;
300         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
301       }
302   }
303   //=====================================================================
304
305   //=====================================================================
306   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
307   {
308          // Start();
309     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
310     //           <<std::endl;
311     
312     do 
313       {
314         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
315                 
316         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
317         //             <<"') lock ok"
318         //               <<std::endl;
319     
320         //                if (mNumberOfThreadedReadersRunning==0)
321         //      if (mThreadedImageReaderList.size()==0)
322         if (true)
323           {
324             ImageToLoad itl(this,filename);
325             ImageMapType::iterator i = mImages.find(&itl);
326             if (i!=mImages.end())
327               {
328                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
329                 // Already inserted
330                 if (pitl->GetImage() != 0)
331                   {
332                     // Already read
333                     UpdateUnloadPriority(pitl,
334                                          GetMaximalPriorityWithoutLocking()+1);
335                     return pitl->GetImage();
336                   }
337               }
338             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
339             mImages[pitl] = 0;
340             pitl->SetImage(mReader->ReadImage(filename));
341             UpdateUnloadPriority(pitl,
342                                  GetMaximalPriorityWithoutLocking()+1);
343             return pitl->GetImage();
344           }
345
346         /*      
347         mRequestedFilename = filename;
348         mRequestedImage = 0;
349         ImageToLoad itl(this,filename);
350         ImageMapType::iterator i = mImages.find(&itl);
351         if (i!=mImages.end())
352           {
353             // Already inserted in queue
354             if (i->first->GetImage() != 0)
355               {
356                 // Already read : ok : return it 
357                 return i->first->GetImage();
358               }
359             /// Already requested : change the priority
360               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
361               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
362               pitl->SetUser( this );
363               // Already in queue
364               if (pitl->Index()>=0) 
365                 {
366                   // Re-sort the queue
367                   mQueue.upsort(pitl->Index());
368                 }
369               // Not read but not in queue = being read = ok
370               else 
371                 {
372                   pitl->SetUser( this );
373                 }
374           }
375         else 
376           {
377             
378             // Never requested before or unloaded 
379             ImageToLoadPtr pitl = 
380               new ImageToLoad(this,filename,
381                               GetMaximalPriorityWithoutLocking() + 1);
382             mImages[pitl] = 0;
383             mQueue.insert(pitl);
384           }
385         */
386       }
387     while (0);
388
389     //    std::cout << "Waiting..."<<std::endl;
390
391     /*
392     // Waiting that it is read
393     int n = 0;
394     do 
395       {
396         //      std::cout << n++ << std::endl;
397         wxMilliSleep(10);
398         do 
399           {
400             //      wxMutexLocker lock(mMutex);
401             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
402             if (mRequestedImage!=0) 
403               {
404                 return mRequestedImage;
405               } 
406           }
407         while (0);
408       }
409     while (true);
410     // 
411     */
412   }
413   //=====================================================================
414   
415   //=====================================================================
416   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
417                                                bool purge)
418   {
419     
420 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
421     //    std::cout << "this="<<this <<std::endl;
422     //    std::cout << "user="<<p->GetUser() <<std::endl;
423
424     if ( p->GetUser() == this ) 
425       GetMultiThreadImageReaderUserMutex().Unlock();
426
427     p->GetUser()->MultiThreadImageReaderSendEvent
428       (p->GetFilename(),
429        MultiThreadImageReaderUser::ImageLoaded,
430        p->GetImage());
431
432     /*
433       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
434       BUGGY : TO FIX 
435     */
436     if (!purge)  return;
437     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
438
439     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
440            
441     mUnloadQueue.insert(p);
442     p->GetImage()->UpdateInformation();
443     p->GetImage()->PropagateUpdateExtent();
444     long ImMem = p->GetImage()->GetEstimatedMemorySize();
445     mTotalMem += ImMem;
446
447     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
448     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
449
450     //  return;
451
452     while (mTotalMem > mTotalMemMax)
453       {
454         GimmickMessage(5,
455                        "   ! Exceeded max of "
456                        << mTotalMemMax << " Ko : unloading oldest image ... "
457                        << std::endl);
458         if ( mUnloadQueue.size() <= 1 ) 
459           {
460              GimmickMessage(5,
461                             "   Only one image : cannot load AND unload it !!"
462                             <<std::endl);
463             break; 
464             
465           }
466         ImageToLoadPtr unload = mUnloadQueue.remove_top();
467         MultiThreadImageReaderUser* user = unload->GetUser();
468
469         /*
470         if ((user!=0)&&(user!=this)) 
471           {
472             user->GetMultiThreadImageReaderUserMutex().Lock();
473           }
474         */
475
476         std::string filename = unload->GetFilename();
477
478         GimmickMessage(5,"'" << filename << "'" << std::endl);
479         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
480
481         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
482
483         if (user!=0) 
484           {
485             //      std::cout << "unlock..."<<std::endl;
486             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
487             //      std::cout << "event"<<std::endl;
488             user->MultiThreadImageReaderSendEvent
489               (filename,
490                MultiThreadImageReaderUser::ImageUnloaded,
491                0);
492             //      std::cout << "event ok"<<std::endl;
493           }     
494
495         if (unload->Index()>=0)
496           {
497             // GimmickMessage(5,"still in queue"<<std::endl);
498           }
499         unload->Index() = -1;
500
501
502         ImageMapType::iterator it = mImages.find(unload);
503         if (it!=mImages.end())
504           {
505             mImages.erase(it);
506           }
507         //          std::cout << "delete..."<<std::endl;
508         delete unload;
509         //          std::cout << "delete ok."<<std::endl;
510
511       }
512   }
513   //=====================================================================
514
515   //=====================================================================
516   int MultiThreadImageReader::GetMaximalPriority()
517   { 
518     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
519     return GetMaximalPriorityWithoutLocking();
520   }
521   //=====================================================================
522
523
524   //=====================================================================
525   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
526   { 
527     long max = 0;
528     if (mQueue.size()>0) 
529       {
530         max = mQueue.top()->GetPriority();
531       }
532     if (mUnloadQueue.size()>0)
533       {
534         int max2 = mUnloadQueue.top()->GetPriority();
535         if (max2>max) max=max2;
536       }
537     return max;
538   }
539   //=====================================================================
540
541
542   //=====================================================================
543   //=====================================================================
544   //=====================================================================
545   //=====================================================================
546
547   //=====================================================================
548   void*  ThreadedImageReader::Entry()
549   {
550     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
551     //                << std::endl;
552
553     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
554       ("",
555        MultiThreadImageReaderUser::ThreadedReaderStarted,
556        0);
557
558     // While was not deleted 
559     while (!TestDestroy())
560       {
561                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
562           
563         // Lock the mutex
564         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
565         //mMutex.Lock();
566         // If image in queue
567         if (mMultiThreadImageReader->mQueue.size()>0)
568           {
569             MultiThreadImageReader::ImageToLoadPtr i = 
570               mMultiThreadImageReader->mQueue.remove_top();
571
572             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
573             //mMutex.Unlock();
574
575             
576             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
577             //                << i->GetFilename() << "'" << std::endl;
578             
579             // Do the job
580             vtkImageData* im = Read(i->GetFilename());
581
582             // Store it in the map
583             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
584             //mMutex.Lock();
585             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
586             MultiThreadImageReader::ImageMapType::iterator it = 
587               mMultiThreadImageReader->mImages.find(&itl);
588             MultiThreadImageReader::ImageToLoadPtr 
589               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
590               (it->first);
591             pitl->SetImage(im);
592             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
593             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
594             
595             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
596             //                << i->GetFilename() << "' : DONE" << std::endl;
597             
598           }
599         else 
600           {
601             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
602             //mMutex.Unlock();
603             // Wait a little to avoid blocking 
604             Sleep(10);
605           }
606       };
607     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
608     //                << std::endl;
609        
610     return 0;
611   }
612   //=====================================================================
613
614   //=====================================================================
615   void ThreadedImageReader::OnExit()
616   {
617     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
618       ("",
619        MultiThreadImageReaderUser::ThreadedReaderStopped,
620        0);
621   }
622   //=====================================================================
623
624   //=====================================================================
625   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
626   {
627     return mReader.ReadImage(filename);
628   }
629   //=====================================================================
630
631 } // namespace creaImageIO