]> Creatis software - creaImageIO.git/blob - src/creaImageIOMultiThreadImageReader.cpp
8e5c716da4c5bbf03adef473575e7adc224d505e
[creaImageIO.git] / src / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6 #include <creaImageIOGimmick.h>
7 #ifdef _DEBUG
8 #define new DEBUG_NEW
9 #endif
10 namespace creaImageIO
11 {
12
13   //=====================================================================
14   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
15   ( const std::string& filename,
16     EventType type,
17     vtkImageData* image)
18   {
19     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
20
21     this->OnMultiThreadImageReaderEvent(filename,type,image);
22   }
23   //=====================================================================
24
25   //=====================================================================
26   class ThreadedImageReader: public wxThread
27   {
28   public:
29     ThreadedImageReader(MultiThreadImageReader* tir) :
30       mMultiThreadImageReader(tir)
31     {}
32
33     void* Entry();
34     void  OnExit();
35
36     vtkImageData* Read(const std::string& filename);
37     
38         struct deleter
39         {
40                 void operator()(ThreadedImageReader* p)
41                 {
42                         p->Delete();
43                 }
44         };
45         friend struct deleter;
46
47
48   private:
49     ImageReader mReader;
50     MultiThreadImageReader* mMultiThreadImageReader;
51         
52   };
53
54   //=====================================================================
55
56   
57   //=====================================================================
58   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
59     : //mDoNotSignal(false),
60       mReader(0),
61       mTotalMem(0),
62       mTotalMemMax(1000000)
63   {
64     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
65     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
66
67           mDone = false;
68     // Create the threads
69     for (int i=0; i<number_of_threads; i++) 
70       {
71                   //ThreadedImageReader* t = new ThreadedImageReader(this);
72                   boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this), ThreadedImageReader::deleter());
73         mThreadedImageReaderList.push_back(t);
74          std::cout << "  ===> Thread "<<i
75                       <<" successfully added"<< std::endl;
76       }
77     mNumberOfThreadedReadersRunning = 0;
78     // Init the queue
79     mQueue.set(mComparator);
80     mQueue.set(mIndexer);
81     // 
82     // no thread : alloc self reader
83 //    if (number_of_threads==0)
84 //      {
85         mReader = new ImageReader();
86 //      }
87   }
88   //=====================================================================
89
90
91   //=====================================================================
92   bool MultiThreadImageReader::Start()
93   {
94
95     //    std::cout << "#### MultiThreadImageReader::Start()"
96     //                <<std::endl;
97           if (mNumberOfThreadedReadersRunning > 0) return true;
98           
99     ThreadedImageReaderListType::iterator i;
100     for (i =mThreadedImageReaderList.begin();
101          i!=mThreadedImageReaderList.end();
102          i++)
103       {
104         (*i)->Create();
105         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
106           {
107             std::cout << "ERROR starting a thread"<< std::endl;
108             return false;
109           }
110         else 
111           {
112                     std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
113                               <<" successfully created"<< std::endl;
114             
115           }
116       }
117     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
118     //    std::cout << "EO Start : #Threads running = "
119     //                << mNumberOfThreadedReadersRunning<<std::endl;
120
121     return true;
122   }
123   //=====================================================================
124
125   //=====================================================================
126   void MultiThreadImageReader::Stop()
127   { 
128 //                  std::cout << "#### MultiThreadImageReader::Stop()"
129 //            <<std::endl;
130   //  std::cout << "Sending stop order to the threads..."<<std::endl;
131           if (mDone) return;
132
133     ThreadedImageReaderListType::iterator i;
134     for (i =mThreadedImageReaderList.begin();
135          i!=mThreadedImageReaderList.end();
136          i++)
137       { std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
138                               <<" successfully stopped"<< std::endl;
139                   if((*i)->IsAlive())
140                   {(*i)->Pause();
141                           (*i).reset();
142                          //                       (*i)->Delete();
143                   }
144       }
145    mThreadedImageReaderList.clear();
146     // Wait a little to be sure that all threads have stopped
147     // A better way to do this ?
148     //    wxMilliSleep(1000);
149     // New method : the threads generate a stop event when they have finished
150     // We wait until all threads have stopped
151 //        std::cout << "Waiting for stop signals..."<<std::endl;
152     do 
153       {
154         // Sleep a little
155                 wxMilliSleep(10);
156         // Lock
157         {
158           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
159 //                std::cout << "#Threads running = "
160 //                          << mNumberOfThreadedReadersRunning<<std::endl;
161           // Break if all readers have stopped
162           if (mNumberOfThreadedReadersRunning <= 0) 
163             {
164               break;
165             }
166         }
167       } 
168     while (true);
169 //        std::cout << "All threads stopped : OK "<<std::endl;
170
171     ImageMapType::iterator j;
172     for (j =mImages.begin();
173          j!=mImages.end();
174          ++j)
175
176       {
177         delete j->first;
178       }
179     mImages.clear();
180         mDone = true;
181   }
182   //=====================================================================
183
184   //=====================================================================
185   MultiThreadImageReader::~MultiThreadImageReader()
186   {
187     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
188     //        <<std::endl;
189     Stop();
190     if (mReader) delete mReader;
191         mThreadedImageReaderList.clear();
192   }
193   //=====================================================================
194
195   //=====================================================================
196   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
197                                                     int priority)
198   {
199     // not in unload queue : ciao
200     if (p->UnloadIndex()<0) return;
201     int old_prio = p->GetPriority();
202     if (priority > old_prio) 
203       {
204         p->SetPriority(priority);
205         mUnloadQueue.downsort(p->UnloadIndex());
206       }
207     else if ( old_prio > priority )
208       {
209         p->SetPriority(priority);
210         mUnloadQueue.upsort(p->UnloadIndex());
211      }
212   }
213   //=====================================================================
214   // function to read attributes for a file
215   void MultiThreadImageReader::getAttributes(const std::string filename, 
216           std::map <std::string , std::string> &infos,std::vector<std::string> i_attr)
217   {
218           mReader->getAttributes(filename, infos, i_attr);
219   }
220
221   //=====================================================================
222   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
223                                         const std::string& filename, 
224                                         int priority )
225   {
226         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
227
228           if (mNumberOfThreadedReadersRunning==0)
229 //    if (mThreadedImageReaderList.size()==0) 
230       {
231         // no detached reader : use self reader
232         ImageToLoad itl(user,filename);
233         ImageMapType::iterator i = mImages.find(&itl);
234         if (i!=mImages.end())
235           {
236             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
237             // Already inserted
238             if (pitl->GetImage() != 0)
239               {
240                 // Already read
241                 pitl->SetUser(user);
242                 UpdateUnloadPriority(pitl,priority);
243                 SignalImageRead(pitl,false);
244                 return; // pitl->GetImage();
245               }
246           }
247         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
248         mImages[pitl] = 0;
249         pitl->SetImage(mReader->ReadImage(filename));
250         UpdateUnloadPriority(pitl,priority);
251         SignalImageRead(pitl,true);
252         //      return pitl->GetImage();
253         return;
254       }
255
256     ImageToLoad itl(user,filename);
257     ImageMapType::iterator i = mImages.find(&itl);
258     if (i!=mImages.end())
259       {
260         // Already inserted
261         if (i->first->GetImage() != 0)
262           {
263             // Already read : ok :signal the user
264             UpdateUnloadPriority(i->first,priority);
265             SignalImageRead(i->first,false);
266             return;
267           }
268         /// Already requested : change the priority
269         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
270         pitl->SetPriority(priority);
271         // Already in queue
272         if (pitl->Index()>=0) 
273           {
274             // Re-sort the queue
275             mQueue.upsort(pitl->Index());
276           }
277         // Not read but not in queue = being read = ok
278         else 
279           {
280             
281           }
282       }
283     else 
284       {
285         // Never requested before or unloaded 
286         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
287         mImages[pitl] = 0;
288         mQueue.insert(pitl);
289       }
290   }
291   //=====================================================================
292
293   //=====================================================================
294   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
295   (const std::string& filename,
296    MultiThreadImageReaderUser::EventType e,
297    vtkImageData* image)
298   {
299     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
300         (filename == mRequestedFilename))
301       {
302         mRequestedImage = image;
303       }
304     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
305       {
306         mNumberOfThreadedReadersRunning++;
307         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
308       }
309     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
310       {
311         
312                  mNumberOfThreadedReadersRunning--;
313         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
314       }
315   }
316   //=====================================================================
317
318   //=====================================================================
319   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
320   {
321          // Start();
322     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
323     //           <<std::endl;
324     
325     do 
326       {
327         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
328                 
329         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
330         //             <<"') lock ok"
331         //               <<std::endl;
332     
333         //                if (mNumberOfThreadedReadersRunning==0)
334         //      if (mThreadedImageReaderList.size()==0)
335         if (true)
336           {
337             ImageToLoad itl(this,filename);
338             ImageMapType::iterator i = mImages.find(&itl);
339             if (i!=mImages.end())
340               {
341                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
342                 // Already inserted
343                 if (pitl->GetImage() != 0)
344                   {
345                     // Already read
346                     UpdateUnloadPriority(pitl,
347                                          GetMaximalPriorityWithoutLocking()+1);
348                     return pitl->GetImage();
349                   }
350               }
351             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
352             mImages[pitl] = 0;
353             pitl->SetImage(mReader->ReadImage(filename));
354             UpdateUnloadPriority(pitl,
355                                  GetMaximalPriorityWithoutLocking()+1);
356             return pitl->GetImage();
357           }
358
359         /*      
360         mRequestedFilename = filename;
361         mRequestedImage = 0;
362         ImageToLoad itl(this,filename);
363         ImageMapType::iterator i = mImages.find(&itl);
364         if (i!=mImages.end())
365           {
366             // Already inserted in queue
367             if (i->first->GetImage() != 0)
368               {
369                 // Already read : ok : return it 
370                 return i->first->GetImage();
371               }
372             /// Already requested : change the priority
373               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
374               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
375               pitl->SetUser( this );
376               // Already in queue
377               if (pitl->Index()>=0) 
378                 {
379                   // Re-sort the queue
380                   mQueue.upsort(pitl->Index());
381                 }
382               // Not read but not in queue = being read = ok
383               else 
384                 {
385                   pitl->SetUser( this );
386                 }
387           }
388         else 
389           {
390             
391             // Never requested before or unloaded 
392             ImageToLoadPtr pitl = 
393               new ImageToLoad(this,filename,
394                               GetMaximalPriorityWithoutLocking() + 1);
395             mImages[pitl] = 0;
396             mQueue.insert(pitl);
397           }
398         */
399       }
400     while (0);
401
402     //    std::cout << "Waiting..."<<std::endl;
403
404     /*
405     // Waiting that it is read
406     int n = 0;
407     do 
408       {
409         //      std::cout << n++ << std::endl;
410         wxMilliSleep(10);
411         do 
412           {
413             //      wxMutexLocker lock(mMutex);
414             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
415             if (mRequestedImage!=0) 
416               {
417                 return mRequestedImage;
418               } 
419           }
420         while (0);
421       }
422     while (true);
423     // 
424     */
425   }
426   //=====================================================================
427   
428   //=====================================================================
429   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
430                                                bool purge)
431   {
432     
433 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
434     //    std::cout << "this="<<this <<std::endl;
435     //    std::cout << "user="<<p->GetUser() <<std::endl;
436
437     if ( p->GetUser() == this ) 
438       GetMultiThreadImageReaderUserMutex().Unlock();
439
440     p->GetUser()->MultiThreadImageReaderSendEvent
441       (p->GetFilename(),
442        MultiThreadImageReaderUser::ImageLoaded,
443        p->GetImage());
444
445     /*
446       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
447       BUGGY : TO FIX 
448     */
449     if (!purge)  return;
450     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
451
452     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
453            
454     mUnloadQueue.insert(p);
455     p->GetImage()->UpdateInformation();
456     p->GetImage()->PropagateUpdateExtent();
457     long ImMem = p->GetImage()->GetEstimatedMemorySize();
458     mTotalMem += ImMem;
459
460     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
461     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
462
463     //  return;
464
465     while (mTotalMem > mTotalMemMax)
466       {
467         GimmickMessage(5,
468                        "   ! Exceeded max of "
469                        << mTotalMemMax << " Ko : unloading oldest image ... "
470                        << std::endl);
471         if ( mUnloadQueue.size() <= 1 ) 
472           {
473              GimmickMessage(5,
474                             "   Only one image : cannot load AND unload it !!"
475                             <<std::endl);
476             break; 
477             
478           }
479         ImageToLoadPtr unload = mUnloadQueue.remove_top();
480         MultiThreadImageReaderUser* user = unload->GetUser();
481
482         /*
483         if ((user!=0)&&(user!=this)) 
484           {
485             user->GetMultiThreadImageReaderUserMutex().Lock();
486           }
487         */
488
489         std::string filename = unload->GetFilename();
490
491         GimmickMessage(5,"'" << filename << "'" << std::endl);
492         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
493
494         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
495
496         if (user!=0) 
497           {
498             //      std::cout << "unlock..."<<std::endl;
499             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
500             //      std::cout << "event"<<std::endl;
501             user->MultiThreadImageReaderSendEvent
502               (filename,
503                MultiThreadImageReaderUser::ImageUnloaded,
504                0);
505             //      std::cout << "event ok"<<std::endl;
506           }     
507
508         if (unload->Index()>=0)
509           {
510             // GimmickMessage(5,"still in queue"<<std::endl);
511           }
512         unload->Index() = -1;
513
514
515         ImageMapType::iterator it = mImages.find(unload);
516         if (it!=mImages.end())
517           {
518             mImages.erase(it);
519           }
520         //          std::cout << "delete..."<<std::endl;
521         delete unload;
522         //          std::cout << "delete ok."<<std::endl;
523
524       }
525   }
526   //=====================================================================
527
528   //=====================================================================
529   int MultiThreadImageReader::GetMaximalPriority()
530   { 
531     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
532     return GetMaximalPriorityWithoutLocking();
533   }
534   //=====================================================================
535
536
537   //=====================================================================
538   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
539   { 
540     long max = 0;
541     if (mQueue.size()>0) 
542       {
543         max = mQueue.top()->GetPriority();
544       }
545     if (mUnloadQueue.size()>0)
546       {
547         int max2 = mUnloadQueue.top()->GetPriority();
548         if (max2>max) max=max2;
549       }
550     return max;
551   }
552   //=====================================================================
553
554
555   //=====================================================================
556   //=====================================================================
557   //=====================================================================
558   //=====================================================================
559
560   //=====================================================================
561   void*  ThreadedImageReader::Entry()
562   {
563     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
564     //                << std::endl;
565
566     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
567       ("",
568        MultiThreadImageReaderUser::ThreadedReaderStarted,
569        0);
570
571     // While was not deleted 
572     while (!TestDestroy())
573       {
574                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
575           
576         // Lock the mutex
577         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
578         //mMutex.Lock();
579         // If image in queue
580         if (mMultiThreadImageReader->mQueue.size()>0)
581           {
582             MultiThreadImageReader::ImageToLoadPtr i = 
583               mMultiThreadImageReader->mQueue.remove_top();
584
585             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
586             //mMutex.Unlock();
587
588             
589             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
590             //                << i->GetFilename() << "'" << std::endl;
591             
592             // Do the job
593             vtkImageData* im = Read(i->GetFilename());
594
595             // Store it in the map
596             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
597             //mMutex.Lock();
598             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
599             MultiThreadImageReader::ImageMapType::iterator it = 
600               mMultiThreadImageReader->mImages.find(&itl);
601             MultiThreadImageReader::ImageToLoadPtr 
602               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
603               (it->first);
604             pitl->SetImage(im);
605             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
606             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
607             
608             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
609             //                << i->GetFilename() << "' : DONE" << std::endl;
610             
611           }
612         else 
613           {
614             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
615             //mMutex.Unlock();
616             // Wait a little to avoid blocking 
617             Sleep(10);
618           }
619       };
620     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
621     //                << std::endl;
622        
623     return 0;
624   }
625   //=====================================================================
626
627   //=====================================================================
628   void ThreadedImageReader::OnExit()
629   {
630     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
631       ("",
632        MultiThreadImageReaderUser::ThreadedReaderStopped,
633        0);
634   }
635   //=====================================================================
636
637   //=====================================================================
638   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
639   {
640     return mReader.ReadImage(filename);
641   }
642   //=====================================================================
643
644 } // namespace creaImageIO