]> Creatis software - creaImageIO.git/blob - src/creaImageIOMultiThreadImageReader.cpp
#3185 creaImageIO Feature New Normal - Clean code
[creaImageIO.git] / src / creaImageIOMultiThreadImageReader.cpp
1 /*
2 # ---------------------------------------------------------------------
3 #
4 # Copyright (c) CREATIS (Centre de Recherche en Acquisition et Traitement de l'Image 
5 #                        pour la Santé)
6 # Authors : Eduardo Davila, Frederic Cervenansky, Claire Mouton
7 # Previous Authors : Laurent Guigues, Jean-Pierre Roux
8 # CreaTools website : www.creatis.insa-lyon.fr/site/fr/creatools_accueil
9 #
10 #  This software is governed by the CeCILL-B license under French law and 
11 #  abiding by the rules of distribution of free software. You can  use, 
12 #  modify and/ or redistribute the software under the terms of the CeCILL-B 
13 #  license as circulated by CEA, CNRS and INRIA at the following URL 
14 #  http://www.cecill.info/licences/Licence_CeCILL-B_V1-en.html 
15 #  or in the file LICENSE.txt.
16 #
17 #  As a counterpart to the access to the source code and  rights to copy,
18 #  modify and redistribute granted by the license, users are provided only
19 #  with a limited warranty  and the software's author,  the holder of the
20 #  economic rights,  and the successive licensors  have only  limited
21 #  liability. 
22 #
23 #  The fact that you are presently reading this means that you have had
24 #  knowledge of the CeCILL-B license and that you accept its terms.
25 # ------------------------------------------------------------------------
26 */
27
28
29 #include <creaImageIOMultiThreadImageReader.h>
30 #include <creaImageIOImageReader.h>
31 #include <wx/utils.h>
32 #include <creaImageIOSystem.h>
33
34 #include <creaImageIOGimmick.h>
35 #ifdef _DEBUG
36 #define new DEBUG_NEW
37 #endif
38 namespace creaImageIO
39 {
40
41   //=====================================================================
42   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
43   ( const std::string& filename,
44     EventType type,
45     vtkImageData* image)
46   {
47     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
48
49     this->OnMultiThreadImageReaderEvent(filename,type,image);
50   }
51   //=====================================================================
52
53   //=====================================================================
54   class ThreadedImageReader: public wxThread
55   {
56   public:
57     ThreadedImageReader(MultiThreadImageReader* tir) :
58       mMultiThreadImageReader(tir)
59     {}
60
61     void* Entry();
62     void  OnExit();
63
64     vtkImageData* Read(const std::string& filename);
65     
66         struct deleter
67         {
68                 void operator()(ThreadedImageReader* p)
69                 {
70                         p->Delete();
71                 }
72         };
73         friend struct deleter;
74
75
76   private:
77     ImageReader mReader;
78     MultiThreadImageReader* mMultiThreadImageReader;
79         
80   };
81
82   //=====================================================================
83
84   
85   //=====================================================================
86   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
87     : //mDoNotSignal(false),
88       mReader(0),
89       mTotalMem(0),
90       mTotalMemMax(1000000)
91   {
92     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
93     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
94
95           mDone = false;
96     // Create the threads
97     for (int i=0; i<number_of_threads; i++) 
98       {
99                   //ThreadedImageReader* t = new ThreadedImageReader(this);
100                   boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this), ThreadedImageReader::deleter());
101         mThreadedImageReaderList.push_back(t);
102          std::cout << "  ===> Thread "<<i
103                       <<" successfully added"<< std::endl;
104       }
105     mNumberOfThreadedReadersRunning = 0;
106     // Init the queue
107     mQueue.set(mComparator);
108     mQueue.set(mIndexer);
109     // 
110     // no thread : alloc self reader
111 //    if (number_of_threads==0)
112 //      {
113         mReader = new ImageReader();
114 //      }
115   }
116   //=====================================================================
117
118
119   //=====================================================================
120   bool MultiThreadImageReader::Start()
121   {
122
123     //    std::cout << "#### MultiThreadImageReader::Start()"
124     //                <<std::endl;
125           if (mNumberOfThreadedReadersRunning > 0) return true;
126           
127     ThreadedImageReaderListType::iterator i;
128     for (i =mThreadedImageReaderList.begin();
129          i!=mThreadedImageReaderList.end();
130          i++)
131       {
132         (*i)->Create();
133         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
134           {
135             std::cout << "ERROR starting a thread"<< std::endl;
136             return false;
137           }
138         else 
139           {
140                     std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
141                               <<" successfully created"<< std::endl;
142             
143           }
144       }
145     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
146     //    std::cout << "EO Start : #Threads running = "
147     //                << mNumberOfThreadedReadersRunning<<std::endl;
148
149     return true;
150   }
151   //=====================================================================
152
153   //=====================================================================
154   void MultiThreadImageReader::Stop()
155   { 
156 //                  std::cout << "#### MultiThreadImageReader::Stop()"
157 //            <<std::endl;
158   //  std::cout << "Sending stop order to the threads..."<<std::endl;
159           if (mDone) return;
160
161     ThreadedImageReaderListType::iterator i;
162     for (i =mThreadedImageReaderList.begin();
163          i!=mThreadedImageReaderList.end();
164          i++)
165       { std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
166                               <<" successfully stopped"<< std::endl;
167                   if((*i)->IsAlive())
168                   {(*i)->Pause();
169                           (*i).reset();
170                          //                       (*i)->Delete();
171                   }
172       }
173    mThreadedImageReaderList.clear();
174     // Wait a little to be sure that all threads have stopped
175     // A better way to do this ?
176     //    wxMilliSleep(1000);
177     // New method : the threads generate a stop event when they have finished
178     // We wait until all threads have stopped
179 //        std::cout << "Waiting for stop signals..."<<std::endl;
180     do 
181       {
182         // Sleep a little
183                 wxMilliSleep(10);
184         // Lock
185         {
186           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
187 //                std::cout << "#Threads running = "
188 //                          << mNumberOfThreadedReadersRunning<<std::endl;
189           // Break if all readers have stopped
190           if (mNumberOfThreadedReadersRunning <= 0) 
191             {
192               break;
193             }
194         }
195       } 
196     while (true);
197 //        std::cout << "All threads stopped : OK "<<std::endl;
198
199     ImageMapType::iterator j;
200     for (j =mImages.begin();
201          j!=mImages.end();
202          ++j)
203
204       {
205         delete j->first;
206       }
207     mImages.clear();
208         mDone = true;
209   }
210   //=====================================================================
211
212   //=====================================================================
213   MultiThreadImageReader::~MultiThreadImageReader()
214   {
215     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
216     //        <<std::endl;
217     Stop();
218     if (mReader) delete mReader;
219         mThreadedImageReaderList.clear();
220   }
221   //=====================================================================
222
223   //=====================================================================
224   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
225                                                     int priority)
226   {
227     // not in unload queue : ciao
228     if (p->UnloadIndex()<0) return;
229     int old_prio = p->GetPriority();
230     if (priority > old_prio) 
231       {
232         p->SetPriority(priority);
233         mUnloadQueue.downsort(p->UnloadIndex());
234       }
235     else if ( old_prio > priority )
236       {
237         p->SetPriority(priority);
238         mUnloadQueue.upsort(p->UnloadIndex());
239      }
240   }
241   //=====================================================================
242   // function to read attributes for a file
243   void MultiThreadImageReader::getAttributes(const std::string filename, 
244           std::map <std::string , std::string> &infos,std::vector<std::string> i_attr)
245   {
246           mReader->getAttributes(filename, infos, i_attr);
247   }
248
249   //=====================================================================
250   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
251                                         const std::string& filename, 
252                                         int priority )
253   {
254         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
255
256           if (mNumberOfThreadedReadersRunning==0)
257 //    if (mThreadedImageReaderList.size()==0) 
258       {
259         // no detached reader : use self reader
260         ImageToLoad itl(user,filename);
261         ImageMapType::iterator i = mImages.find(&itl);
262         if (i!=mImages.end())
263           {
264             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
265             // Already inserted
266             if (pitl->GetImage() != 0)
267               {
268                 // Already read
269                 pitl->SetUser(user);
270                 UpdateUnloadPriority(pitl,priority);
271                 SignalImageRead(pitl,false);
272                 return; // pitl->GetImage();
273               }
274           }
275         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
276         mImages[pitl] = 0;
277         pitl->SetImage(mReader->ReadImage(filename));
278         UpdateUnloadPriority(pitl,priority);
279         SignalImageRead(pitl,true);
280         //      return pitl->GetImage();
281         return;
282       }
283
284     ImageToLoad itl(user,filename);
285     ImageMapType::iterator i = mImages.find(&itl);
286     if (i!=mImages.end())
287       {
288         // Already inserted
289         if (i->first->GetImage() != 0)
290           {
291             // Already read : ok :signal the user
292             UpdateUnloadPriority(i->first,priority);
293             SignalImageRead(i->first,false);
294             return;
295           }
296         /// Already requested : change the priority
297         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
298         pitl->SetPriority(priority);
299         // Already in queue
300         if (pitl->Index()>=0) 
301           {
302             // Re-sort the queue
303             mQueue.upsort(pitl->Index());
304           }
305         // Not read but not in queue = being read = ok
306         else 
307           {
308             
309           }
310       }
311     else 
312       {
313         // Never requested before or unloaded 
314         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
315         mImages[pitl] = 0;
316         mQueue.insert(pitl);
317       }
318   }
319   //=====================================================================
320
321   //=====================================================================
322   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
323   (const std::string& filename,
324    MultiThreadImageReaderUser::EventType e,
325    vtkImageData* image)
326   {
327     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
328         (filename == mRequestedFilename))
329       {
330         mRequestedImage = image;
331       }
332     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
333       {
334         mNumberOfThreadedReadersRunning++;
335         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
336       }
337     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
338       {
339         
340                  mNumberOfThreadedReadersRunning--;
341         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
342       }
343   }
344   //=====================================================================
345
346   //=====================================================================
347   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
348   {
349          // Start();
350     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
351     //           <<std::endl;
352     
353     do 
354       {
355         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
356                 
357         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
358         //             <<"') lock ok"
359         //               <<std::endl;
360     
361         //                if (mNumberOfThreadedReadersRunning==0)
362         //      if (mThreadedImageReaderList.size()==0)
363         if (true)
364           {
365             ImageToLoad itl(this,filename);
366             ImageMapType::iterator i = mImages.find(&itl);
367             if (i!=mImages.end())
368               {
369                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
370                 // Already inserted
371                 if (pitl->GetImage() != 0)
372                   {
373                     // Already read
374                     UpdateUnloadPriority(pitl,
375                                          GetMaximalPriorityWithoutLocking()+1);
376                     return pitl->GetImage();
377                   }
378               }
379             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
380             mImages[pitl] = 0;
381             pitl->SetImage(mReader->ReadImage(filename));
382             UpdateUnloadPriority(pitl,
383                                  GetMaximalPriorityWithoutLocking()+1);
384             return pitl->GetImage();
385           }
386
387         /*      
388         mRequestedFilename = filename;
389         mRequestedImage = 0;
390         ImageToLoad itl(this,filename);
391         ImageMapType::iterator i = mImages.find(&itl);
392         if (i!=mImages.end())
393           {
394             // Already inserted in queue
395             if (i->first->GetImage() != 0)
396               {
397                 // Already read : ok : return it 
398                 return i->first->GetImage();
399               }
400             /// Already requested : change the priority
401               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
402               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
403               pitl->SetUser( this );
404               // Already in queue
405               if (pitl->Index()>=0) 
406                 {
407                   // Re-sort the queue
408                   mQueue.upsort(pitl->Index());
409                 }
410               // Not read but not in queue = being read = ok
411               else 
412                 {
413                   pitl->SetUser( this );
414                 }
415           }
416         else 
417           {
418             
419             // Never requested before or unloaded 
420             ImageToLoadPtr pitl = 
421               new ImageToLoad(this,filename,
422                               GetMaximalPriorityWithoutLocking() + 1);
423             mImages[pitl] = 0;
424             mQueue.insert(pitl);
425           }
426         */
427       }
428     while (0);
429
430     //    std::cout << "Waiting..."<<std::endl;
431
432     /*
433     // Waiting that it is read
434     int n = 0;
435     do 
436       {
437         //      std::cout << n++ << std::endl;
438         wxMilliSleep(10);
439         do 
440           {
441             //      wxMutexLocker lock(mMutex);
442             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
443             if (mRequestedImage!=0) 
444               {
445                 return mRequestedImage;
446               } 
447           }
448         while (0);
449       }
450     while (true);
451     // 
452     */
453   }
454   //=====================================================================
455   
456   //=====================================================================
457   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
458                                                bool purge)
459   {
460     
461 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
462     //    std::cout << "this="<<this <<std::endl;
463     //    std::cout << "user="<<p->GetUser() <<std::endl;
464
465     if ( p->GetUser() == this ) 
466       GetMultiThreadImageReaderUserMutex().Unlock();
467
468     p->GetUser()->MultiThreadImageReaderSendEvent
469       (p->GetFilename(),
470        MultiThreadImageReaderUser::ImageLoaded,
471        p->GetImage());
472
473     /*
474       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
475       BUGGY : TO FIX 
476     */
477     if (!purge)  return;
478     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
479
480     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
481            
482     mUnloadQueue.insert(p);
483     p->GetImage()->UpdateInformation();
484     p->GetImage()->PropagateUpdateExtent();
485     long ImMem = p->GetImage()->GetEstimatedMemorySize();
486     mTotalMem += ImMem;
487
488     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
489     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
490
491     //  return;
492
493     while (mTotalMem > mTotalMemMax)
494       {
495         GimmickMessage(5,
496                        "   ! Exceeded max of "
497                        << mTotalMemMax << " Ko : unloading oldest image ... "
498                        << std::endl);
499         if ( mUnloadQueue.size() <= 1 ) 
500           {
501              GimmickMessage(5,
502                             "   Only one image : cannot load AND unload it !!"
503                             <<std::endl);
504             break; 
505             
506           }
507         ImageToLoadPtr unload = mUnloadQueue.remove_top();
508         MultiThreadImageReaderUser* user = unload->GetUser();
509
510         /*
511         if ((user!=0)&&(user!=this)) 
512           {
513             user->GetMultiThreadImageReaderUserMutex().Lock();
514           }
515         */
516
517         std::string filename = unload->GetFilename();
518
519         GimmickMessage(5,"'" << filename << "'" << std::endl);
520         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
521
522         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
523
524         if (user!=0) 
525           {
526             //      std::cout << "unlock..."<<std::endl;
527             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
528             //      std::cout << "event"<<std::endl;
529             user->MultiThreadImageReaderSendEvent
530               (filename,
531                MultiThreadImageReaderUser::ImageUnloaded,
532                0);
533             //      std::cout << "event ok"<<std::endl;
534           }     
535
536         if (unload->Index()>=0)
537           {
538             // GimmickMessage(5,"still in queue"<<std::endl);
539           }
540         unload->Index() = -1;
541
542
543         ImageMapType::iterator it = mImages.find(unload);
544         if (it!=mImages.end())
545           {
546             mImages.erase(it);
547           }
548         //          std::cout << "delete..."<<std::endl;
549         delete unload;
550         //          std::cout << "delete ok."<<std::endl;
551
552       }
553   }
554   //=====================================================================
555
556   //=====================================================================
557   int MultiThreadImageReader::GetMaximalPriority()
558   { 
559     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
560     return GetMaximalPriorityWithoutLocking();
561   }
562   //=====================================================================
563
564
565   //=====================================================================
566   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
567   { 
568     long max = 0;
569     if (mQueue.size()>0) 
570       {
571         max = mQueue.top()->GetPriority();
572       }
573     if (mUnloadQueue.size()>0)
574       {
575         int max2 = mUnloadQueue.top()->GetPriority();
576         if (max2>max) max=max2;
577       }
578     return max;
579   }
580   //=====================================================================
581
582
583   //=====================================================================
584   //=====================================================================
585   //=====================================================================
586   //=====================================================================
587
588   //=====================================================================
589   void*  ThreadedImageReader::Entry()
590   {
591     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
592     //                << std::endl;
593
594     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
595       ("",
596        MultiThreadImageReaderUser::ThreadedReaderStarted,
597        0);
598
599     // While was not deleted 
600     while (!TestDestroy())
601       {
602                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
603           
604         // Lock the mutex
605         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
606         //mMutex.Lock();
607         // If image in queue
608         if (mMultiThreadImageReader->mQueue.size()>0)
609           {
610             MultiThreadImageReader::ImageToLoadPtr i = 
611               mMultiThreadImageReader->mQueue.remove_top();
612
613             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
614             //mMutex.Unlock();
615
616             
617             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
618             //                << i->GetFilename() << "'" << std::endl;
619             
620             // Do the job
621             vtkImageData* im = Read(i->GetFilename());
622
623             // Store it in the map
624             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
625             //mMutex.Lock();
626             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
627             MultiThreadImageReader::ImageMapType::iterator it = 
628               mMultiThreadImageReader->mImages.find(&itl);
629             MultiThreadImageReader::ImageToLoadPtr 
630               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
631               (it->first);
632             pitl->SetImage(im);
633             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
634             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
635             
636             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
637             //                << i->GetFilename() << "' : DONE" << std::endl;
638             
639           }
640         else 
641           {
642             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
643             //mMutex.Unlock();
644             // Wait a little to avoid blocking 
645             Sleep(10);
646           }
647       };
648     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
649     //                << std::endl;
650        
651     return 0;
652   }
653   //=====================================================================
654
655   //=====================================================================
656   void ThreadedImageReader::OnExit()
657   {
658     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
659       ("",
660        MultiThreadImageReaderUser::ThreadedReaderStopped,
661        0);
662   }
663   //=====================================================================
664
665   //=====================================================================
666   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
667   {
668     return mReader.ReadImage(filename);
669   }
670   //=====================================================================
671
672 } // namespace creaImageIO