]> Creatis software - creaImageIO.git/blob - src/creaImageIOMultiThreadImageReader.cpp
#3111 creaImageIO Bug New Normal - branch vtk7itk4 compilation with vtk7
[creaImageIO.git] / src / creaImageIOMultiThreadImageReader.cpp
1 /*
2 # ---------------------------------------------------------------------
3 #
4 # Copyright (c) CREATIS (Centre de Recherche en Acquisition et Traitement de l'Image 
5 #                        pour la Santé)
6 # Authors : Eduardo Davila, Frederic Cervenansky, Claire Mouton
7 # Previous Authors : Laurent Guigues, Jean-Pierre Roux
8 # CreaTools website : www.creatis.insa-lyon.fr/site/fr/creatools_accueil
9 #
10 #  This software is governed by the CeCILL-B license under French law and 
11 #  abiding by the rules of distribution of free software. You can  use, 
12 #  modify and/ or redistribute the software under the terms of the CeCILL-B 
13 #  license as circulated by CEA, CNRS and INRIA at the following URL 
14 #  http://www.cecill.info/licences/Licence_CeCILL-B_V1-en.html 
15 #  or in the file LICENSE.txt.
16 #
17 #  As a counterpart to the access to the source code and  rights to copy,
18 #  modify and redistribute granted by the license, users are provided only
19 #  with a limited warranty  and the software's author,  the holder of the
20 #  economic rights,  and the successive licensors  have only  limited
21 #  liability. 
22 #
23 #  The fact that you are presently reading this means that you have had
24 #  knowledge of the CeCILL-B license and that you accept its terms.
25 # ------------------------------------------------------------------------
26 */
27
28
29 #include <creaImageIOMultiThreadImageReader.h>
30 #include <creaImageIOImageReader.h>
31 #include <wx/utils.h>
32 #include <creaImageIOSystem.h>
33
34 #include <creaImageIOGimmick.h>
35 #ifdef _DEBUG
36 #define new DEBUG_NEW
37 #endif
38 namespace creaImageIO
39 {
40
41   //=====================================================================
42   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
43   ( const std::string& filename,
44     EventType type,
45     vtkImageData* image)
46   {
47     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
48
49     this->OnMultiThreadImageReaderEvent(filename,type,image);
50   }
51   //=====================================================================
52
53   //=====================================================================
54   class ThreadedImageReader: public wxThread
55   {
56   public:
57     ThreadedImageReader(MultiThreadImageReader* tir) :
58       mMultiThreadImageReader(tir)
59     {}
60
61     void* Entry();
62     void  OnExit();
63
64     vtkImageData* Read(const std::string& filename);
65     
66         struct deleter
67         {
68                 void operator()(ThreadedImageReader* p)
69                 {
70                         p->Delete();
71                 }
72         };
73         friend struct deleter;
74
75
76   private:
77     ImageReader mReader;
78     MultiThreadImageReader* mMultiThreadImageReader;
79         
80   };
81
82   //=====================================================================
83
84   
85   //=====================================================================
86   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
87     : //mDoNotSignal(false),
88       mReader(0),
89       mTotalMem(0),
90       mTotalMemMax(1000000)
91   {
92     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
93     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
94
95           mDone = false;
96     // Create the threads
97     for (int i=0; i<number_of_threads; i++) 
98       {
99                   //ThreadedImageReader* t = new ThreadedImageReader(this);
100                   boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this), ThreadedImageReader::deleter());
101         mThreadedImageReaderList.push_back(t);
102          std::cout << "  ===> Thread "<<i
103                       <<" successfully added"<< std::endl;
104       }
105     mNumberOfThreadedReadersRunning = 0;
106     // Init the queue
107     mQueue.set(mComparator);
108     mQueue.set(mIndexer);
109     // 
110     // no thread : alloc self reader
111 //    if (number_of_threads==0)
112 //      {
113         mReader = new ImageReader();
114 //      }
115   }
116   //=====================================================================
117
118
119   //=====================================================================
120   bool MultiThreadImageReader::Start()
121   {
122
123     //    std::cout << "#### MultiThreadImageReader::Start()"
124     //                <<std::endl;
125           if (mNumberOfThreadedReadersRunning > 0) return true;
126           
127     ThreadedImageReaderListType::iterator i;
128     for (i =mThreadedImageReaderList.begin();
129          i!=mThreadedImageReaderList.end();
130          i++)
131       {
132         (*i)->Create();
133         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
134           {
135             std::cout << "ERROR starting a thread"<< std::endl;
136             return false;
137           }
138         else 
139           {
140                     std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
141                               <<" successfully created"<< std::endl;
142             
143           }
144       }
145     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
146     //    std::cout << "EO Start : #Threads running = "
147     //                << mNumberOfThreadedReadersRunning<<std::endl;
148
149     return true;
150   }
151   //=====================================================================
152
153   //=====================================================================
154   void MultiThreadImageReader::Stop()
155   { 
156 //                  std::cout << "#### MultiThreadImageReader::Stop()"
157 //            <<std::endl;
158   //  std::cout << "Sending stop order to the threads..."<<std::endl;
159           if (mDone) return;
160
161     ThreadedImageReaderListType::iterator i;
162     for (i =mThreadedImageReaderList.begin();
163          i!=mThreadedImageReaderList.end();
164          i++)
165       { std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
166                               <<" successfully stopped"<< std::endl;
167                   if((*i)->IsAlive())
168                   {(*i)->Pause();
169                           (*i).reset();
170                          //                       (*i)->Delete();
171                   }
172       }
173    mThreadedImageReaderList.clear();
174     // Wait a little to be sure that all threads have stopped
175     // A better way to do this ?
176     //    wxMilliSleep(1000);
177     // New method : the threads generate a stop event when they have finished
178     // We wait until all threads have stopped
179 //        std::cout << "Waiting for stop signals..."<<std::endl;
180     do 
181       {
182         // Sleep a little
183                 wxMilliSleep(10);
184         // Lock
185         {
186           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
187 //                std::cout << "#Threads running = "
188 //                          << mNumberOfThreadedReadersRunning<<std::endl;
189           // Break if all readers have stopped
190           if (mNumberOfThreadedReadersRunning <= 0) 
191             {
192               break;
193             }
194         }
195       } 
196     while (true);
197 //        std::cout << "All threads stopped : OK "<<std::endl;
198
199     ImageMapType::iterator j;
200     for (j =mImages.begin();
201          j!=mImages.end();
202          ++j)
203
204       {
205         delete j->first;
206       }
207     mImages.clear();
208         mDone = true;
209   }
210   //=====================================================================
211
212   //=====================================================================
213   MultiThreadImageReader::~MultiThreadImageReader()
214   {
215     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
216     //        <<std::endl;
217     Stop();
218     if (mReader) delete mReader;
219         mThreadedImageReaderList.clear();
220   }
221   //=====================================================================
222
223   //=====================================================================
224   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
225                                                     int priority)
226   {
227     // not in unload queue : ciao
228     if (p->UnloadIndex()<0) return;
229     int old_prio = p->GetPriority();
230     if (priority > old_prio) 
231       {
232         p->SetPriority(priority);
233         mUnloadQueue.downsort(p->UnloadIndex());
234       }
235     else if ( old_prio > priority )
236       {
237         p->SetPriority(priority);
238         mUnloadQueue.upsort(p->UnloadIndex());
239      }
240   }
241   //=====================================================================
242   // function to read attributes for a file
243   void MultiThreadImageReader::getAttributes(const std::string filename, 
244           std::map <std::string , std::string> &infos,std::vector<std::string> i_attr)
245   {
246           mReader->getAttributes(filename, infos, i_attr);
247   }
248
249   //=====================================================================
250   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
251                                         const std::string& filename, 
252                                         int priority )
253   {
254         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
255
256           if (mNumberOfThreadedReadersRunning==0)
257 //    if (mThreadedImageReaderList.size()==0) 
258       {
259         // no detached reader : use self reader
260         ImageToLoad itl(user,filename);
261         ImageMapType::iterator i = mImages.find(&itl);
262         if (i!=mImages.end())
263           {
264             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
265             // Already inserted
266             if (pitl->GetImage() != 0)
267               {
268                 // Already read
269                 pitl->SetUser(user);
270                 UpdateUnloadPriority(pitl,priority);
271                 SignalImageRead(pitl,false);
272                 return; // pitl->GetImage();
273               }
274           }
275         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
276         mImages[pitl] = 0;
277         pitl->SetImage(mReader->ReadImage(filename));
278         UpdateUnloadPriority(pitl,priority);
279         SignalImageRead(pitl,true);
280         //      return pitl->GetImage();
281         return;
282       }
283
284     ImageToLoad itl(user,filename);
285     ImageMapType::iterator i = mImages.find(&itl);
286     if (i!=mImages.end())
287       {
288         // Already inserted
289         if (i->first->GetImage() != 0)
290           {
291             // Already read : ok :signal the user
292             UpdateUnloadPriority(i->first,priority);
293             SignalImageRead(i->first,false);
294             return;
295           }
296         /// Already requested : change the priority
297         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
298         pitl->SetPriority(priority);
299         // Already in queue
300         if (pitl->Index()>=0) 
301           {
302             // Re-sort the queue
303             mQueue.upsort(pitl->Index());
304           }
305         // Not read but not in queue = being read = ok
306         else 
307           {
308             
309           }
310       }
311     else 
312       {
313         // Never requested before or unloaded 
314         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
315         mImages[pitl] = 0;
316         mQueue.insert(pitl);
317       }
318   }
319   //=====================================================================
320
321   //=====================================================================
322   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
323   (const std::string& filename,
324    MultiThreadImageReaderUser::EventType e,
325    vtkImageData* image)
326   {
327     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
328         (filename == mRequestedFilename))
329       {
330         mRequestedImage = image;
331       }
332     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
333       {
334         mNumberOfThreadedReadersRunning++;
335         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
336       }
337     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
338       {
339         
340                  mNumberOfThreadedReadersRunning--;
341         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
342       }
343   }
344   //=====================================================================
345
346   //=====================================================================
347   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
348   {
349          // Start();
350     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
351     //           <<std::endl;
352     
353     do 
354       {
355         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
356                 
357         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
358         //             <<"') lock ok"
359         //               <<std::endl;
360     
361         //                if (mNumberOfThreadedReadersRunning==0)
362         //      if (mThreadedImageReaderList.size()==0)
363         if (true)
364           {
365             ImageToLoad itl(this,filename);
366             ImageMapType::iterator i = mImages.find(&itl);
367             if (i!=mImages.end())
368               {
369                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
370                 // Already inserted
371                 if (pitl->GetImage() != 0)
372                   {
373                     // Already read
374                     UpdateUnloadPriority(pitl,
375                                          GetMaximalPriorityWithoutLocking()+1);
376                     return pitl->GetImage();
377                   }
378               }
379             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
380             mImages[pitl] = 0;
381             pitl->SetImage(mReader->ReadImage(filename));
382             UpdateUnloadPriority(pitl,
383                                  GetMaximalPriorityWithoutLocking()+1);
384             return pitl->GetImage();
385           }
386
387         /*      
388         mRequestedFilename = filename;
389         mRequestedImage = 0;
390         ImageToLoad itl(this,filename);
391         ImageMapType::iterator i = mImages.find(&itl);
392         if (i!=mImages.end())
393           {
394             // Already inserted in queue
395             if (i->first->GetImage() != 0)
396               {
397                 // Already read : ok : return it 
398                 return i->first->GetImage();
399               }
400             /// Already requested : change the priority
401               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
402               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
403               pitl->SetUser( this );
404               // Already in queue
405               if (pitl->Index()>=0) 
406                 {
407                   // Re-sort the queue
408                   mQueue.upsort(pitl->Index());
409                 }
410               // Not read but not in queue = being read = ok
411               else 
412                 {
413                   pitl->SetUser( this );
414                 }
415           }
416         else 
417           {
418             
419             // Never requested before or unloaded 
420             ImageToLoadPtr pitl = 
421               new ImageToLoad(this,filename,
422                               GetMaximalPriorityWithoutLocking() + 1);
423             mImages[pitl] = 0;
424             mQueue.insert(pitl);
425           }
426         */
427       }
428     while (0);
429
430     //    std::cout << "Waiting..."<<std::endl;
431
432     /*
433     // Waiting that it is read
434     int n = 0;
435     do 
436       {
437         //      std::cout << n++ << std::endl;
438         wxMilliSleep(10);
439         do 
440           {
441             //      wxMutexLocker lock(mMutex);
442             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
443             if (mRequestedImage!=0) 
444               {
445                 return mRequestedImage;
446               } 
447           }
448         while (0);
449       }
450     while (true);
451     // 
452     */
453   }
454   //=====================================================================
455   
456   //=====================================================================
457   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
458                                                bool purge)
459   {
460     
461 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
462     //    std::cout << "this="<<this <<std::endl;
463     //    std::cout << "user="<<p->GetUser() <<std::endl;
464
465     if ( p->GetUser() == this ) 
466         {
467       GetMultiThreadImageReaderUserMutex().Unlock();
468         }
469
470     p->GetUser()->MultiThreadImageReaderSendEvent
471       (p->GetFilename(),
472        MultiThreadImageReaderUser::ImageLoaded,
473        p->GetImage());
474
475     /*
476       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
477       BUGGY : TO FIX 
478     */
479     if (!purge)  return;
480     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
481
482     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
483            
484     mUnloadQueue.insert(p);
485
486
487 //EED 2017-01-01 Migration VTK7
488 #if VTK_MAJOR_VERSION <= 5
489     p->GetImage()->UpdateInformation();
490     p->GetImage()->PropagateUpdateExtent();
491     long ImMem = p->GetImage()->GetEstimatedMemorySize();
492 #else
493         int ext[6];
494         int dim[3];
495         p->GetImage()->GetExtent(ext);
496         dim[0]          = ext[1]-ext[0]+1;
497         dim[1]          = ext[3]-ext[2]+1;
498         dim[2]          = ext[5]-ext[4]+1;
499     long ImMem  = dim[0]*dim[1]*dim[2]*p->GetImage()->GetScalarSize();;
500 #endif
501     mTotalMem += ImMem;
502
503     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
504     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
505
506     //  return;
507
508     while (mTotalMem > mTotalMemMax)
509       {
510         GimmickMessage(5,
511                        "   ! Exceeded max of "
512                        << mTotalMemMax << " Ko : unloading oldest image ... "
513                        << std::endl);
514         if ( mUnloadQueue.size() <= 1 ) 
515           {
516              GimmickMessage(5,
517                             "   Only one image : cannot load AND unload it !!"
518                             <<std::endl);
519             break; 
520             
521           }
522         ImageToLoadPtr unload = mUnloadQueue.remove_top();
523         MultiThreadImageReaderUser* user = unload->GetUser();
524
525         /*
526         if ((user!=0)&&(user!=this)) 
527           {
528             user->GetMultiThreadImageReaderUserMutex().Lock();
529           }
530         */
531
532         std::string filename = unload->GetFilename();
533
534         GimmickMessage(5,"'" << filename << "'" << std::endl);
535
536 //EED 2017-01-01 Migration VTK7
537 #if VTK_MAJOR_VERSION <= 5
538         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
539 #else
540         int ext[6];
541         int dim[3];
542         unload->GetImage()->GetExtent(ext);
543         dim[0]          = ext[1]-ext[0]+1;
544         dim[1]          = ext[3]-ext[2]+1;
545         dim[2]          = ext[5]-ext[4]+1;
546         mTotalMem -= dim[0]*dim[1]*dim[2]*unload->GetImage()->GetScalarSize();
547 #endif
548
549         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
550
551         if (user!=0) 
552           {
553             //      std::cout << "unlock..."<<std::endl;
554             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
555             //      std::cout << "event"<<std::endl;
556             user->MultiThreadImageReaderSendEvent
557               (filename,
558                MultiThreadImageReaderUser::ImageUnloaded,
559                0);
560             //      std::cout << "event ok"<<std::endl;
561           }     
562
563         if (unload->Index()>=0)
564           {
565             // GimmickMessage(5,"still in queue"<<std::endl);
566           }
567         unload->Index() = -1;
568
569
570         ImageMapType::iterator it = mImages.find(unload);
571         if (it!=mImages.end())
572           {
573             mImages.erase(it);
574           }
575         //          std::cout << "delete..."<<std::endl;
576         delete unload;
577         //          std::cout << "delete ok."<<std::endl;
578
579       }
580   }
581   //=====================================================================
582
583   //=====================================================================
584   int MultiThreadImageReader::GetMaximalPriority()
585   { 
586     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
587     return GetMaximalPriorityWithoutLocking();
588   }
589   //=====================================================================
590
591
592   //=====================================================================
593   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
594   { 
595     long max = 0;
596     if (mQueue.size()>0) 
597       {
598         max = mQueue.top()->GetPriority();
599       }
600     if (mUnloadQueue.size()>0)
601       {
602         int max2 = mUnloadQueue.top()->GetPriority();
603         if (max2>max) max=max2;
604       }
605     return max;
606   }
607   //=====================================================================
608
609
610   //=====================================================================
611   //=====================================================================
612   //=====================================================================
613   //=====================================================================
614
615   //=====================================================================
616   void*  ThreadedImageReader::Entry()
617   {
618     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
619     //                << std::endl;
620
621     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
622       ("",
623        MultiThreadImageReaderUser::ThreadedReaderStarted,
624        0);
625
626     // While was not deleted 
627     while (!TestDestroy())
628       {
629                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
630           
631         // Lock the mutex
632         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
633         //mMutex.Lock();
634         // If image in queue
635         if (mMultiThreadImageReader->mQueue.size()>0)
636           {
637             MultiThreadImageReader::ImageToLoadPtr i = 
638               mMultiThreadImageReader->mQueue.remove_top();
639
640             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
641             //mMutex.Unlock();
642
643             
644             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
645             //                << i->GetFilename() << "'" << std::endl;
646             
647             // Do the job
648             vtkImageData* im = Read(i->GetFilename());
649
650             // Store it in the map
651             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
652             //mMutex.Lock();
653             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
654             MultiThreadImageReader::ImageMapType::iterator it = 
655               mMultiThreadImageReader->mImages.find(&itl);
656             MultiThreadImageReader::ImageToLoadPtr 
657               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
658               (it->first);
659             pitl->SetImage(im);
660             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
661             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
662             
663             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
664             //                << i->GetFilename() << "' : DONE" << std::endl;
665             
666           }
667         else 
668           {
669             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
670             //mMutex.Unlock();
671             // Wait a little to avoid blocking 
672             Sleep(10);
673           }
674       };
675     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
676     //                << std::endl;
677        
678     return 0;
679   }
680   //=====================================================================
681
682   //=====================================================================
683   void ThreadedImageReader::OnExit()
684   {
685     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
686       ("",
687        MultiThreadImageReaderUser::ThreadedReaderStopped,
688        0);
689   }
690   //=====================================================================
691
692   //=====================================================================
693   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
694   {
695     return mReader.ReadImage(filename);
696   }
697   //=====================================================================
698
699 } // namespace creaImageIO