]> Creatis software - creaImageIO.git/blob - src2/creaImageIOMultiThreadImageReader.cpp
Fixed Multi-thread reader start and stop errors and shows non private columns.
[creaImageIO.git] / src2 / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6 namespace creaImageIO
7 {
8
9   //=====================================================================
10   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
11   ( const std::string& filename,
12     EventType type,
13     vtkImageData* image)
14   {
15     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
16     this->OnMultiThreadImageReaderEvent(filename,type,image);
17   }
18   //=====================================================================
19
20   //=====================================================================
21   class ThreadedImageReader: public wxThread
22   {
23   public:
24     ThreadedImageReader(MultiThreadImageReader* tir) :
25       mMultiThreadImageReader(tir)
26     {}
27
28     void* Entry();
29     void  OnExit();
30
31     vtkImageData* Read(const std::string& filename);
32     
33
34   private:
35     ImageReader mReader;
36     MultiThreadImageReader* mMultiThreadImageReader;
37         
38   };
39   //=====================================================================
40
41   
42   //=====================================================================
43   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
44     : //mDoNotSignal(false),
45       mReader(0),
46       mTotalMem(0),
47       mTotalMemMax(1500000)
48   {
49     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
50     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
51
52     // Create the threads
53     for (int i=0; i<number_of_threads; i++) 
54       {
55         ThreadedImageReader* t = new ThreadedImageReader(this);
56         mThreadedImageReaderList.push_back(t);
57       }
58     mNumberOfThreadedReadersRunning = 0;
59     // Init the queue
60     mQueue.set(mComparator);
61     mQueue.set(mIndexer);
62     // 
63     // no thread : alloc self reader
64 //    if (number_of_threads==0)
65 //      {
66         mReader = new ImageReader();
67 //      }
68   }
69   //=====================================================================
70
71
72   //=====================================================================
73   bool MultiThreadImageReader::Start()
74   {
75
76     //    std::cout << "#### MultiThreadImageReader::Start()"
77     //                <<std::endl;
78           if (mNumberOfThreadedReadersRunning > 0) return true;
79           
80     ThreadedImageReaderListType::iterator i;
81     for (i =mThreadedImageReaderList.begin();
82          i!=mThreadedImageReaderList.end();
83          i++)
84       {
85         (*i)->Create();
86         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
87           {
88             std::cout << "ERROR starting a thread"<< std::endl;
89             return false;
90           }
91         else 
92           {
93             //      std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
94             //                <<" successfully created"<< std::endl;
95             
96           }
97       }
98     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
99     //    std::cout << "EO Start : #Threads running = "
100     //                << mNumberOfThreadedReadersRunning<<std::endl;
101
102     return true;
103   }
104   //=====================================================================
105
106   //=====================================================================
107   void MultiThreadImageReader::Stop()
108   { 
109 //                  std::cout << "#### MultiThreadImageReader::Stop()"
110 //            <<std::endl;
111   //  std::cout << "Sending stop order to the threads..."<<std::endl;
112
113     ThreadedImageReaderListType::iterator i;
114     for (i =mThreadedImageReaderList.begin();
115          i!=mThreadedImageReaderList.end();
116          i++)
117       {
118                   if((*i)->IsAlive())
119                   {
120                         (*i)->Delete();
121                   }
122       }
123     mThreadedImageReaderList.clear();
124     // Wait a little to be sure that all threads have stopped
125     // A better way to do this ?
126     //    wxMilliSleep(1000);
127     // New method : the threads generate a stop event when they have finished
128     // We wait until all threads have stopped
129 //        std::cout << "Waiting for stop signals..."<<std::endl;
130     do 
131       {
132         // Sleep a little
133         wxMilliSleep(10);
134         // Lock
135         {
136           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
137 //                std::cout << "#Threads running = "
138 //                          << mNumberOfThreadedReadersRunning<<std::endl;
139           // Break if all readers have stopped
140           if (mNumberOfThreadedReadersRunning <= 0) 
141             {
142               break;
143             }
144         }
145       } 
146     while (true);
147 //        std::cout << "All threads stopped : OK "<<std::endl;
148
149     ImageMapType::iterator j;
150     for (j =mImages.begin();
151          j!=mImages.end();
152          ++j)
153
154       {
155         delete j->first;
156       }
157     mImages.clear();
158   }
159   //=====================================================================
160
161   //=====================================================================
162   MultiThreadImageReader::~MultiThreadImageReader()
163   {
164     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
165     //        <<std::endl;
166     Stop();
167     if (mReader) delete mReader;
168   }
169   //=====================================================================
170
171   //=====================================================================
172   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
173                                                     int priority)
174   {
175     // not in unload queue : ciao
176     if (p->UnloadIndex()<0) return;
177     int old_prio = p->GetPriority();
178     if (priority > old_prio) 
179       {
180         p->SetPriority(priority);
181         mUnloadQueue.downsort(p->UnloadIndex());
182       }
183     else if ( old_prio > priority )
184       {
185         p->SetPriority(priority);
186         mUnloadQueue.upsort(p->UnloadIndex());
187      }
188   }
189   //=====================================================================
190
191   //=====================================================================
192   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
193                                         const std::string& filename, 
194                                         int priority )
195   {
196         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
197     
198           if (mNumberOfThreadedReadersRunning==0)
199 //    if (mThreadedImageReaderList.size()==0) 
200       {
201         // no detached reader : use self reader
202         ImageToLoad itl(user,filename);
203         ImageMapType::iterator i = mImages.find(&itl);
204         if (i!=mImages.end())
205           {
206             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
207             // Already inserted
208             if (pitl->GetImage() != 0)
209               {
210                 // Already read
211                 pitl->SetUser(user);
212                 UpdateUnloadPriority(pitl,priority);
213                 SignalImageRead(pitl,false);
214                 return; // pitl->GetImage();
215               }
216           }
217         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
218         mImages[pitl] = 0;
219         pitl->SetImage(mReader->ReadImage(filename));
220         UpdateUnloadPriority(pitl,priority);
221         SignalImageRead(pitl,true);
222         //      return pitl->GetImage();
223         return;
224       }
225
226
227     ImageToLoad itl(user,filename);
228     ImageMapType::iterator i = mImages.find(&itl);
229     if (i!=mImages.end())
230       {
231         // Already inserted
232         if (i->first->GetImage() != 0)
233           {
234             // Already read : ok :signal the user
235             UpdateUnloadPriority(i->first,priority);
236             SignalImageRead(i->first,false);
237             return;
238           }
239         /// Already requested : change the priority
240         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
241         pitl->SetPriority(priority);
242         // Already in queue
243         if (pitl->Index()>=0) 
244           {
245             // Re-sort the queue
246             mQueue.upsort(pitl->Index());
247           }
248         // Not read but not in queue = being read = ok
249         else 
250           {
251             
252           }
253       }
254     else 
255       {
256         // Never requested before or unloaded 
257         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
258         mImages[pitl] = 0;
259         mQueue.insert(pitl);
260       }
261   }
262   //=====================================================================
263
264   //=====================================================================
265   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
266   (const std::string& filename,
267    MultiThreadImageReaderUser::EventType e,
268    vtkImageData* image)
269   {
270     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
271         (filename == mRequestedFilename))
272       {
273         mRequestedImage = image;
274       }
275     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
276       {
277         mNumberOfThreadedReadersRunning++;
278         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
279       }
280     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
281       {
282         mNumberOfThreadedReadersRunning--;
283         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
284       }
285   }
286   //=====================================================================
287
288   //=====================================================================
289   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
290   {
291          // Start();
292     //    std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
293     //        <<std::endl;
294     
295     do 
296       {
297         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
298                 
299                   if (mNumberOfThreadedReadersRunning==0)
300 //      if (mThreadedImageReaderList.size()==0)
301           {
302             ImageToLoad itl(this,filename);
303             ImageMapType::iterator i = mImages.find(&itl);
304             if (i!=mImages.end())
305               {
306                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
307                 // Already inserted
308                 if (pitl->GetImage() != 0)
309                   {
310                     // Already read
311                     UpdateUnloadPriority(pitl,
312                                          GetMaximalPriorityWithoutLocking()+1);
313                     return pitl->GetImage();
314                   }
315               }
316             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
317             mImages[pitl] = 0;
318             pitl->SetImage(mReader->ReadImage(filename));
319             UpdateUnloadPriority(pitl,
320                                  GetMaximalPriorityWithoutLocking()+1);
321             return pitl->GetImage();
322           }
323         
324         mRequestedFilename = filename;
325         mRequestedImage = 0;
326         ImageToLoad itl(this,filename);
327         ImageMapType::iterator i = mImages.find(&itl);
328         if (i!=mImages.end())
329           {
330             // Already inserted in queue
331             if (i->first->GetImage() != 0)
332               {
333                 // Already read : ok : return it 
334                 return i->first->GetImage();
335               }
336             /// Already requested : change the priority
337               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
338               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
339               pitl->SetUser( this );
340               // Already in queue
341               if (pitl->Index()>=0) 
342                 {
343                   // Re-sort the queue
344                   mQueue.upsort(pitl->Index());
345                 }
346               // Not read but not in queue = being read = ok
347               else 
348                 {
349                   pitl->SetUser( this );
350                 }
351           }
352         else 
353           {
354             
355             // Never requested before or unloaded 
356             ImageToLoadPtr pitl = 
357               new ImageToLoad(this,filename,
358                               GetMaximalPriorityWithoutLocking() + 1);
359             mImages[pitl] = 0;
360             mQueue.insert(pitl);
361           }
362       }
363     while (0);
364
365     //    std::cout << "Waiting..."<<std::endl;
366
367     // Waiting that it is read
368     int n = 0;
369     do 
370       {
371         //      std::cout << n++ << std::endl;
372         wxMilliSleep(10);
373         do 
374           {
375             //      wxMutexLocker lock(mMutex);
376             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
377             if (mRequestedImage!=0) 
378               {
379                 return mRequestedImage;
380               } 
381           }
382         while (0);
383       }
384     while (true);
385     // 
386     
387   }
388   //=====================================================================
389   
390   //=====================================================================
391   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
392                                                bool purge)
393   {
394     
395 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
396     //    std::cout << "this="<<this <<std::endl;
397     //    std::cout << "user="<<p->GetUser() <<std::endl;
398
399     if ( p->GetUser() == this ) 
400       GetMultiThreadImageReaderUserMutex().Unlock();
401
402     p->GetUser()->MultiThreadImageReaderSendEvent
403       (p->GetFilename(),
404        MultiThreadImageReaderUser::ImageLoaded,
405        p->GetImage());
406
407     /*
408       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
409       BUGGY : TO FIX 
410     */
411     if (!purge)  return;
412     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
413
414     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
415            
416     mUnloadQueue.insert(p);
417     p->GetImage()->UpdateInformation();
418     p->GetImage()->PropagateUpdateExtent();
419     long ImMem = p->GetImage()->GetEstimatedMemorySize();
420     mTotalMem += ImMem;
421
422     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
423     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
424
425     //  return;
426
427     while (mTotalMem > mTotalMemMax)
428       {
429         GimmickMessage(5,
430                        "   ! Exceeded max of "
431                        << mTotalMemMax << " Ko : unloading oldest image ... "
432                        << std::endl);
433         if ( mUnloadQueue.size() <= 1 ) 
434           {
435              GimmickMessage(5,
436                             "   Only one image : cannot load AND unload it !!"
437                             <<std::endl);
438             break; 
439             
440           }
441         ImageToLoadPtr unload = mUnloadQueue.remove_top();
442         MultiThreadImageReaderUser* user = unload->GetUser();
443
444         /*
445         if ((user!=0)&&(user!=this)) 
446           {
447             user->GetMultiThreadImageReaderUserMutex().Lock();
448           }
449         */
450
451         std::string filename = unload->GetFilename();
452
453         GimmickMessage(5,"'" << filename << "'" << std::endl);
454         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
455
456         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
457
458
459         if (user!=0) 
460           {
461             //      std::cout << "unlock..."<<std::endl;
462             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
463             //      std::cout << "event"<<std::endl;
464             user->MultiThreadImageReaderSendEvent
465               (filename,
466                MultiThreadImageReaderUser::ImageUnloaded,
467                0);
468             //      std::cout << "event ok"<<std::endl;
469
470           }     
471
472         if (unload->Index()>=0)
473           {
474             // GimmickMessage(5,"still in queue"<<std::endl);
475           }
476         unload->Index() = -1;
477
478
479         ImageMapType::iterator it = mImages.find(unload);
480         if (it!=mImages.end())
481           {
482             mImages.erase(it);
483           }
484         //          std::cout << "delete..."<<std::endl;
485         delete unload;
486         //          std::cout << "delete ok."<<std::endl;
487
488       }
489     
490   
491   }
492   //=====================================================================
493
494   //=====================================================================
495   int MultiThreadImageReader::GetMaximalPriority()
496   { 
497     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
498     return GetMaximalPriorityWithoutLocking();
499   }
500   //=====================================================================
501
502
503   //=====================================================================
504   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
505   { 
506     long max = 0;
507     if (mQueue.size()>0) 
508       {
509         max = mQueue.top()->GetPriority();
510       }
511     if (mUnloadQueue.size()>0)
512       {
513         int max2 = mUnloadQueue.top()->GetPriority();
514         if (max2>max) max=max2;
515       }
516     return max;
517   }
518   //=====================================================================
519
520
521   //=====================================================================
522   //=====================================================================
523   //=====================================================================
524   //=====================================================================
525
526   //=====================================================================
527   void*  ThreadedImageReader::Entry()
528   {
529     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
530     //                << std::endl;
531
532     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
533       ("",
534        MultiThreadImageReaderUser::ThreadedReaderStarted,
535        0);
536
537     // While was not deleted 
538     while (!TestDestroy())
539       {
540         //      std::cout << "### Thread "<<GetCurrentId()<<" waiting for image"
541         //        << std::endl;
542           
543         // Lock the mutex
544         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
545         //mMutex.Lock();
546         // If image in queue
547         if (mMultiThreadImageReader->mQueue.size()>0)
548           {
549             MultiThreadImageReader::ImageToLoadPtr i = 
550               mMultiThreadImageReader->mQueue.remove_top();
551
552             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
553             //mMutex.Unlock();
554
555             
556             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
557             //                << i->GetFilename() << "'" << std::endl;
558             
559             // Do the job
560             vtkImageData* im = Read(i->GetFilename());
561
562             // Store it in the map
563             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
564             //mMutex.Lock();
565             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
566             MultiThreadImageReader::ImageMapType::iterator it = 
567               mMultiThreadImageReader->mImages.find(&itl);
568             MultiThreadImageReader::ImageToLoadPtr 
569               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
570               (it->first);
571             pitl->SetImage(im);
572             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
573             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
574             
575             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
576             //                << i->GetFilename() << "' : DONE" << std::endl;
577             
578           }
579         else 
580           {
581             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
582             //mMutex.Unlock();
583             // Wait a little to avoid blocking 
584             Sleep(10);
585           }
586       };
587     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
588     //                << std::endl;
589        
590     return 0;
591   }
592   //=====================================================================
593
594   //=====================================================================
595   void ThreadedImageReader::OnExit()
596   {
597     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
598       ("",
599        MultiThreadImageReaderUser::ThreadedReaderStopped,
600        0);
601   }
602   //=====================================================================
603
604   //=====================================================================
605   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
606   {
607     return mReader.ReadImage(filename);
608   }
609   //=====================================================================
610
611 } // namespace creaImageIO