]> Creatis software - creaImageIO.git/blob - src2/creaImageIOMultiThreadImageReader.cpp
7dae59112f77e1726d0ead210863a8936f6a403a
[creaImageIO.git] / src2 / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6
7 namespace creaImageIO
8 {
9
10   //=====================================================================
11   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
12   ( const std::string& filename,
13     EventType type,
14     vtkImageData* image)
15   {
16     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
17
18     this->OnMultiThreadImageReaderEvent(filename,type,image);
19   }
20   //=====================================================================
21
22   //=====================================================================
23   class ThreadedImageReader: public wxThread
24   {
25   public:
26     ThreadedImageReader(MultiThreadImageReader* tir) :
27       mMultiThreadImageReader(tir)
28     {}
29
30     void* Entry();
31     void  OnExit();
32
33     vtkImageData* Read(const std::string& filename);
34     
35         struct deleter
36         {
37                 void operator()(ThreadedImageReader* p)
38                 {
39                         p->Delete();
40                 }
41         };
42         friend struct deleter;
43
44
45   private:
46     ImageReader mReader;
47     MultiThreadImageReader* mMultiThreadImageReader;
48         
49   };
50
51  
52   //=====================================================================
53
54   
55   //=====================================================================
56   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
57     : //mDoNotSignal(false),
58       mReader(0),
59       mTotalMem(0),
60       mTotalMemMax(1000000)
61   {
62     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
63     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
64
65     // Create the threads
66     for (int i=0; i<number_of_threads; i++) 
67       {
68                   //ThreadedImageReader* t = new ThreadedImageReader(this);
69                   boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this), ThreadedImageReader::deleter());
70         mThreadedImageReaderList.push_back(t);
71          std::cout << "  ===> Thread "<<i
72                               <<" successfully added"<< std::endl;
73       }
74     mNumberOfThreadedReadersRunning = 0;
75     // Init the queue
76     mQueue.set(mComparator);
77     mQueue.set(mIndexer);
78     // 
79     // no thread : alloc self reader
80 //    if (number_of_threads==0)
81 //      {
82         mReader = new ImageReader();
83 //      }
84   }
85   //=====================================================================
86
87
88   //=====================================================================
89   bool MultiThreadImageReader::Start()
90   {
91
92     //    std::cout << "#### MultiThreadImageReader::Start()"
93     //                <<std::endl;
94           if (mNumberOfThreadedReadersRunning > 0) return true;
95           
96     ThreadedImageReaderListType::iterator i;
97     for (i =mThreadedImageReaderList.begin();
98          i!=mThreadedImageReaderList.end();
99          i++)
100       {
101         (*i)->Create();
102         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
103           {
104             std::cout << "ERROR starting a thread"<< std::endl;
105             return false;
106           }
107         else 
108           {
109                     std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
110                               <<" successfully created"<< std::endl;
111             
112           }
113       }
114     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
115     //    std::cout << "EO Start : #Threads running = "
116     //                << mNumberOfThreadedReadersRunning<<std::endl;
117
118     return true;
119   }
120   //=====================================================================
121
122   //=====================================================================
123   void MultiThreadImageReader::Stop()
124   { 
125 //                  std::cout << "#### MultiThreadImageReader::Stop()"
126 //            <<std::endl;
127   //  std::cout << "Sending stop order to the threads..."<<std::endl;
128
129     ThreadedImageReaderListType::iterator i;
130     for (i =mThreadedImageReaderList.begin();
131          i!=mThreadedImageReaderList.end();
132          i++)
133       { std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
134                               <<" successfully stopped"<< std::endl;
135                   if((*i)->IsAlive())
136                   {
137                          //(*i)->Delete();
138                   }
139       }
140  //  mThreadedImageReaderList.clear();
141     // Wait a little to be sure that all threads have stopped
142     // A better way to do this ?
143     //    wxMilliSleep(1000);
144     // New method : the threads generate a stop event when they have finished
145     // We wait until all threads have stopped
146 //        std::cout << "Waiting for stop signals..."<<std::endl;
147     do 
148       {
149         // Sleep a little
150                 wxMilliSleep(10);
151         // Lock
152         {
153           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
154 //                std::cout << "#Threads running = "
155 //                          << mNumberOfThreadedReadersRunning<<std::endl;
156           // Break if all readers have stopped
157           if (mNumberOfThreadedReadersRunning <= 0) 
158             {
159               break;
160             }
161         }
162       } 
163     while (true);
164 //        std::cout << "All threads stopped : OK "<<std::endl;
165
166     ImageMapType::iterator j;
167     for (j =mImages.begin();
168          j!=mImages.end();
169          ++j)
170
171       {
172         delete j->first;
173       }
174     mImages.clear();
175   }
176   //=====================================================================
177
178   //=====================================================================
179   MultiThreadImageReader::~MultiThreadImageReader()
180   {
181     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
182     //        <<std::endl;
183     Stop();
184     if (mReader) delete mReader;
185         mThreadedImageReaderList.clear();
186   }
187   //=====================================================================
188
189   //=====================================================================
190   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
191                                                     int priority)
192   {
193     // not in unload queue : ciao
194     if (p->UnloadIndex()<0) return;
195     int old_prio = p->GetPriority();
196     if (priority > old_prio) 
197       {
198         p->SetPriority(priority);
199         mUnloadQueue.downsort(p->UnloadIndex());
200       }
201     else if ( old_prio > priority )
202       {
203         p->SetPriority(priority);
204         mUnloadQueue.upsort(p->UnloadIndex());
205      }
206   }
207   //=====================================================================
208
209   //=====================================================================
210   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
211                                         const std::string& filename, 
212                                         int priority )
213   {
214         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
215     
216           if (mNumberOfThreadedReadersRunning==0)
217 //    if (mThreadedImageReaderList.size()==0) 
218       {
219         // no detached reader : use self reader
220         ImageToLoad itl(user,filename);
221         ImageMapType::iterator i = mImages.find(&itl);
222         if (i!=mImages.end())
223           {
224             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
225             // Already inserted
226             if (pitl->GetImage() != 0)
227               {
228                 // Already read
229                 pitl->SetUser(user);
230                 UpdateUnloadPriority(pitl,priority);
231                 SignalImageRead(pitl,false);
232                 return; // pitl->GetImage();
233               }
234           }
235         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
236         mImages[pitl] = 0;
237         pitl->SetImage(mReader->ReadImage(filename));
238         UpdateUnloadPriority(pitl,priority);
239         SignalImageRead(pitl,true);
240         //      return pitl->GetImage();
241         return;
242       }
243
244
245     ImageToLoad itl(user,filename);
246     ImageMapType::iterator i = mImages.find(&itl);
247     if (i!=mImages.end())
248       {
249         // Already inserted
250         if (i->first->GetImage() != 0)
251           {
252             // Already read : ok :signal the user
253             UpdateUnloadPriority(i->first,priority);
254             SignalImageRead(i->first,false);
255             return;
256           }
257         /// Already requested : change the priority
258         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
259         pitl->SetPriority(priority);
260         // Already in queue
261         if (pitl->Index()>=0) 
262           {
263             // Re-sort the queue
264             mQueue.upsort(pitl->Index());
265           }
266         // Not read but not in queue = being read = ok
267         else 
268           {
269             
270           }
271       }
272     else 
273       {
274         // Never requested before or unloaded 
275         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
276         mImages[pitl] = 0;
277         mQueue.insert(pitl);
278       }
279   }
280   //=====================================================================
281
282   //=====================================================================
283   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
284   (const std::string& filename,
285    MultiThreadImageReaderUser::EventType e,
286    vtkImageData* image)
287   {
288     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
289         (filename == mRequestedFilename))
290       {
291         mRequestedImage = image;
292       }
293     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
294       {
295         mNumberOfThreadedReadersRunning++;
296         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
297       }
298     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
299       {
300         
301                  mNumberOfThreadedReadersRunning--;
302         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
303       }
304   }
305   //=====================================================================
306
307   //=====================================================================
308   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
309   {
310          // Start();
311     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
312     //           <<std::endl;
313     
314     do 
315       {
316         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
317                 
318         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
319         //             <<"') lock ok"
320         //               <<std::endl;
321     
322         //                if (mNumberOfThreadedReadersRunning==0)
323         //      if (mThreadedImageReaderList.size()==0)
324         if (true)
325           {
326             ImageToLoad itl(this,filename);
327             ImageMapType::iterator i = mImages.find(&itl);
328             if (i!=mImages.end())
329               {
330                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
331                 // Already inserted
332                 if (pitl->GetImage() != 0)
333                   {
334                     // Already read
335                     UpdateUnloadPriority(pitl,
336                                          GetMaximalPriorityWithoutLocking()+1);
337                     return pitl->GetImage();
338                   }
339               }
340             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
341             mImages[pitl] = 0;
342             pitl->SetImage(mReader->ReadImage(filename));
343             UpdateUnloadPriority(pitl,
344                                  GetMaximalPriorityWithoutLocking()+1);
345             return pitl->GetImage();
346           }
347
348         /*      
349         mRequestedFilename = filename;
350         mRequestedImage = 0;
351         ImageToLoad itl(this,filename);
352         ImageMapType::iterator i = mImages.find(&itl);
353         if (i!=mImages.end())
354           {
355             // Already inserted in queue
356             if (i->first->GetImage() != 0)
357               {
358                 // Already read : ok : return it 
359                 return i->first->GetImage();
360               }
361             /// Already requested : change the priority
362               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
363               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
364               pitl->SetUser( this );
365               // Already in queue
366               if (pitl->Index()>=0) 
367                 {
368                   // Re-sort the queue
369                   mQueue.upsort(pitl->Index());
370                 }
371               // Not read but not in queue = being read = ok
372               else 
373                 {
374                   pitl->SetUser( this );
375                 }
376           }
377         else 
378           {
379             
380             // Never requested before or unloaded 
381             ImageToLoadPtr pitl = 
382               new ImageToLoad(this,filename,
383                               GetMaximalPriorityWithoutLocking() + 1);
384             mImages[pitl] = 0;
385             mQueue.insert(pitl);
386           }
387         */
388       }
389     while (0);
390
391     //    std::cout << "Waiting..."<<std::endl;
392
393     /*
394     // Waiting that it is read
395     int n = 0;
396     do 
397       {
398         //      std::cout << n++ << std::endl;
399         wxMilliSleep(10);
400         do 
401           {
402             //      wxMutexLocker lock(mMutex);
403             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
404             if (mRequestedImage!=0) 
405               {
406                 return mRequestedImage;
407               } 
408           }
409         while (0);
410       }
411     while (true);
412     // 
413     */
414   }
415   //=====================================================================
416   
417   //=====================================================================
418   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
419                                                bool purge)
420   {
421     
422 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
423     //    std::cout << "this="<<this <<std::endl;
424     //    std::cout << "user="<<p->GetUser() <<std::endl;
425
426     if ( p->GetUser() == this ) 
427       GetMultiThreadImageReaderUserMutex().Unlock();
428
429     p->GetUser()->MultiThreadImageReaderSendEvent
430       (p->GetFilename(),
431        MultiThreadImageReaderUser::ImageLoaded,
432        p->GetImage());
433
434     /*
435       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
436       BUGGY : TO FIX 
437     */
438     if (!purge)  return;
439     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
440
441     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
442            
443     mUnloadQueue.insert(p);
444     p->GetImage()->UpdateInformation();
445     p->GetImage()->PropagateUpdateExtent();
446     long ImMem = p->GetImage()->GetEstimatedMemorySize();
447     mTotalMem += ImMem;
448
449     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
450     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
451
452     //  return;
453
454     while (mTotalMem > mTotalMemMax)
455       {
456         GimmickMessage(5,
457                        "   ! Exceeded max of "
458                        << mTotalMemMax << " Ko : unloading oldest image ... "
459                        << std::endl);
460         if ( mUnloadQueue.size() <= 1 ) 
461           {
462              GimmickMessage(5,
463                             "   Only one image : cannot load AND unload it !!"
464                             <<std::endl);
465             break; 
466             
467           }
468         ImageToLoadPtr unload = mUnloadQueue.remove_top();
469         MultiThreadImageReaderUser* user = unload->GetUser();
470
471         /*
472         if ((user!=0)&&(user!=this)) 
473           {
474             user->GetMultiThreadImageReaderUserMutex().Lock();
475           }
476         */
477
478         std::string filename = unload->GetFilename();
479
480         GimmickMessage(5,"'" << filename << "'" << std::endl);
481         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
482
483         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
484
485
486         if (user!=0) 
487           {
488             //      std::cout << "unlock..."<<std::endl;
489             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
490             //      std::cout << "event"<<std::endl;
491             user->MultiThreadImageReaderSendEvent
492               (filename,
493                MultiThreadImageReaderUser::ImageUnloaded,
494                0);
495             //      std::cout << "event ok"<<std::endl;
496
497           }     
498
499         if (unload->Index()>=0)
500           {
501             // GimmickMessage(5,"still in queue"<<std::endl);
502           }
503         unload->Index() = -1;
504
505
506         ImageMapType::iterator it = mImages.find(unload);
507         if (it!=mImages.end())
508           {
509             mImages.erase(it);
510           }
511         //          std::cout << "delete..."<<std::endl;
512         delete unload;
513         //          std::cout << "delete ok."<<std::endl;
514
515       }
516     
517   
518   }
519   //=====================================================================
520
521   //=====================================================================
522   int MultiThreadImageReader::GetMaximalPriority()
523   { 
524     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
525     return GetMaximalPriorityWithoutLocking();
526   }
527   //=====================================================================
528
529
530   //=====================================================================
531   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
532   { 
533     long max = 0;
534     if (mQueue.size()>0) 
535       {
536         max = mQueue.top()->GetPriority();
537       }
538     if (mUnloadQueue.size()>0)
539       {
540         int max2 = mUnloadQueue.top()->GetPriority();
541         if (max2>max) max=max2;
542       }
543     return max;
544   }
545   //=====================================================================
546
547
548   //=====================================================================
549   //=====================================================================
550   //=====================================================================
551   //=====================================================================
552
553   //=====================================================================
554   void*  ThreadedImageReader::Entry()
555   {
556     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
557     //                << std::endl;
558
559     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
560       ("",
561        MultiThreadImageReaderUser::ThreadedReaderStarted,
562        0);
563
564     // While was not deleted 
565     while (!TestDestroy())
566       {
567                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
568           
569         // Lock the mutex
570         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
571         //mMutex.Lock();
572         // If image in queue
573         if (mMultiThreadImageReader->mQueue.size()>0)
574           {
575             MultiThreadImageReader::ImageToLoadPtr i = 
576               mMultiThreadImageReader->mQueue.remove_top();
577
578             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
579             //mMutex.Unlock();
580
581             
582             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
583             //                << i->GetFilename() << "'" << std::endl;
584             
585             // Do the job
586             vtkImageData* im = Read(i->GetFilename());
587
588             // Store it in the map
589             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
590             //mMutex.Lock();
591             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
592             MultiThreadImageReader::ImageMapType::iterator it = 
593               mMultiThreadImageReader->mImages.find(&itl);
594             MultiThreadImageReader::ImageToLoadPtr 
595               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
596               (it->first);
597             pitl->SetImage(im);
598             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
599             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
600             
601             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
602             //                << i->GetFilename() << "' : DONE" << std::endl;
603             
604           }
605         else 
606           {
607             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
608             //mMutex.Unlock();
609             // Wait a little to avoid blocking 
610             Sleep(10);
611           }
612       };
613     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
614     //                << std::endl;
615        
616     return 0;
617   }
618   //=====================================================================
619
620   //=====================================================================
621   void ThreadedImageReader::OnExit()
622   {
623     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
624       ("",
625        MultiThreadImageReaderUser::ThreadedReaderStopped,
626        0);
627   }
628   //=====================================================================
629
630   //=====================================================================
631   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
632   {
633     return mReader.ReadImage(filename);
634   }
635   //=====================================================================
636
637 } // namespace creaImageIO