]> Creatis software - creaImageIO.git/blob - src/creaImageIOMultiThreadImageReader.cpp
#3208 creaImageIO Feature New Normal - branch mingw64
[creaImageIO.git] / src / creaImageIOMultiThreadImageReader.cpp
1 /*
2 # ---------------------------------------------------------------------
3 #
4 # Copyright (c) CREATIS (Centre de Recherche en Acquisition et Traitement de l'Image 
5 #                        pour la Santé)
6 # Authors : Eduardo Davila, Frederic Cervenansky, Claire Mouton
7 # Previous Authors : Laurent Guigues, Jean-Pierre Roux
8 # CreaTools website : www.creatis.insa-lyon.fr/site/fr/creatools_accueil
9 #
10 #  This software is governed by the CeCILL-B license under French law and 
11 #  abiding by the rules of distribution of free software. You can  use, 
12 #  modify and/ or redistribute the software under the terms of the CeCILL-B 
13 #  license as circulated by CEA, CNRS and INRIA at the following URL 
14 #  http://www.cecill.info/licences/Licence_CeCILL-B_V1-en.html 
15 #  or in the file LICENSE.txt.
16 #
17 #  As a counterpart to the access to the source code and  rights to copy,
18 #  modify and redistribute granted by the license, users are provided only
19 #  with a limited warranty  and the software's author,  the holder of the
20 #  economic rights,  and the successive licensors  have only  limited
21 #  liability. 
22 #
23 #  The fact that you are presently reading this means that you have had
24 #  knowledge of the CeCILL-B license and that you accept its terms.
25 # ------------------------------------------------------------------------
26 */
27
28
29 #include <creaImageIOMultiThreadImageReader.h>
30 #include <creaImageIOImageReader.h>
31 #include <wx/utils.h>
32 #include <creaImageIOSystem.h>
33
34 #include <creaImageIOGimmick.h>
35 #ifdef _DEBUG
36 #define new DEBUG_NEW
37 #endif
38 namespace creaImageIO
39 {
40
41   //=====================================================================
42   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
43   ( const std::string& filename,
44     EventType type,
45     vtkImageData* image)
46   {
47     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
48
49     this->OnMultiThreadImageReaderEvent(filename,type,image);
50   }
51   //=====================================================================
52
53   //=====================================================================
54   class ThreadedImageReader: public wxThread
55   {
56   public:
57     ThreadedImageReader(MultiThreadImageReader* tir) :
58       mMultiThreadImageReader(tir)
59     {}
60
61     void* Entry();
62     void  OnExit();
63
64     vtkImageData* Read(const std::string& filename);
65     
66         struct deleter
67         {
68                 void operator()(ThreadedImageReader* p)
69                 {
70                         p->Delete();
71                 }
72         };
73         friend struct deleter;
74
75
76   private:
77     ImageReader mReader;
78     MultiThreadImageReader* mMultiThreadImageReader;
79         
80   };
81
82   //=====================================================================
83
84   
85   //=====================================================================
86   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
87     : //mDoNotSignal(false),
88       mReader(0),
89       mTotalMem(0),
90       mTotalMemMax(1000000)
91   {
92     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
93     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
94
95           mDone = false;
96     // Create the threads
97     for (int i=0; i<number_of_threads; i++) 
98       {
99                   //ThreadedImageReader* t = new ThreadedImageReader(this);
100                   boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this), ThreadedImageReader::deleter());
101         mThreadedImageReaderList.push_back(t);
102          std::cout << "  ===> Thread "<<i
103                       <<" successfully added"<< std::endl;
104       }
105     mNumberOfThreadedReadersRunning = 0;
106     // Init the queue
107     mQueue.set(mComparator);
108     mQueue.set(mIndexer);
109     // 
110     // no thread : alloc self reader
111 //    if (number_of_threads==0)
112 //      {
113         mReader = new ImageReader();
114 //      }
115   }
116   //=====================================================================
117
118
119   //=====================================================================
120   bool MultiThreadImageReader::Start()
121   {
122     //    std::cout << "#### MultiThreadImageReader::Start()"
123     //                <<std::endl;
124           if (mNumberOfThreadedReadersRunning > 0) return true;
125           
126     ThreadedImageReaderListType::iterator i;
127     for (i =mThreadedImageReaderList.begin();
128          i!=mThreadedImageReaderList.end();
129          i++)
130       {
131         (*i)->Create();
132         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
133           {
134             std::cout << "ERROR starting a thread"<< std::endl;
135             return false;
136           }
137         else 
138           {
139                     std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
140                               <<" successfully created"<< std::endl;
141             
142           }
143       }
144     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
145     //    std::cout << "EO Start : #Threads running = "
146     //                << mNumberOfThreadedReadersRunning<<std::endl;
147
148     return true;
149   }
150   //=====================================================================
151
152   //=====================================================================
153   void MultiThreadImageReader::Stop()
154   { 
155 //                  std::cout << "#### MultiThreadImageReader::Stop()"
156 //            <<std::endl;
157   //  std::cout << "Sending stop order to the threads..."<<std::endl;
158           if (mDone) return;
159
160     ThreadedImageReaderListType::iterator i;
161     for (i =mThreadedImageReaderList.begin();
162          i!=mThreadedImageReaderList.end();
163          i++)
164       { std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
165                               <<" successfully stopped"<< std::endl;
166                   if((*i)->IsAlive())
167                   {(*i)->Pause();
168                           (*i).reset();
169                          //                       (*i)->Delete();
170                   }
171       }
172    mThreadedImageReaderList.clear();
173     // Wait a little to be sure that all threads have stopped
174     // A better way to do this ?
175     //    wxMilliSleep(1000);
176     // New method : the threads generate a stop event when they have finished
177     // We wait until all threads have stopped
178 //        std::cout << "Waiting for stop signals..."<<std::endl;
179     do 
180       {
181         // Sleep a little
182                 wxMilliSleep(10);
183         // Lock
184         {
185           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
186 //                std::cout << "#Threads running = "
187 //                          << mNumberOfThreadedReadersRunning<<std::endl;
188           // Break if all readers have stopped
189           if (mNumberOfThreadedReadersRunning <= 0) 
190             {
191               break;
192             }
193         }
194       } 
195     while (true);
196 //        std::cout << "All threads stopped : OK "<<std::endl;
197
198     ImageMapType::iterator j;
199     for (j =mImages.begin();
200          j!=mImages.end();
201          ++j)
202
203       {
204         delete j->first;
205       }
206     mImages.clear();
207         mDone = true;
208   }
209   //=====================================================================
210
211   //=====================================================================
212   MultiThreadImageReader::~MultiThreadImageReader()
213   {
214     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
215     //        <<std::endl;
216     Stop();
217     if (mReader) delete mReader;
218         mThreadedImageReaderList.clear();
219   }
220   //=====================================================================
221
222   //=====================================================================
223   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
224                                                     int priority)
225   {
226     // not in unload queue : ciao
227     if (p->UnloadIndex()<0) return;
228     int old_prio = p->GetPriority();
229     if (priority > old_prio) 
230       {
231         p->SetPriority(priority);
232         mUnloadQueue.downsort(p->UnloadIndex());
233       }
234     else if ( old_prio > priority )
235       {
236         p->SetPriority(priority);
237         mUnloadQueue.upsort(p->UnloadIndex());
238      }
239   }
240   //=====================================================================
241   // function to read attributes for a file
242   void MultiThreadImageReader::getAttributes(const std::string filename, 
243           std::map <std::string , std::string> &infos,std::vector<std::string> i_attr)
244   {
245           mReader->getAttributes(filename, infos, i_attr);
246   }
247
248   //=====================================================================
249   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
250                                         const std::string& filename, 
251                                         int priority )
252   {
253         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
254
255           if (mNumberOfThreadedReadersRunning==0)
256 //    if (mThreadedImageReaderList.size()==0) 
257       {
258         // no detached reader : use self reader
259         ImageToLoad itl(user,filename);
260         ImageMapType::iterator i = mImages.find(&itl);
261         if (i!=mImages.end())
262           {
263             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
264             // Already inserted
265             if (pitl->GetImage() != 0)
266               {
267                 // Already read
268                 pitl->SetUser(user);
269                 UpdateUnloadPriority(pitl,priority);
270                 SignalImageRead(pitl,false);
271                 return; // pitl->GetImage();
272               }
273           }
274         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
275         mImages[pitl] = 0;
276         pitl->SetImage(mReader->ReadImage(filename));
277         UpdateUnloadPriority(pitl,priority);
278         SignalImageRead(pitl,true);
279         //      return pitl->GetImage();
280         return;
281       }
282
283     ImageToLoad itl(user,filename);
284     ImageMapType::iterator i = mImages.find(&itl);
285     if (i!=mImages.end())
286       {
287         // Already inserted
288         if (i->first->GetImage() != 0)
289           {
290             // Already read : ok :signal the user
291             UpdateUnloadPriority(i->first,priority);
292             SignalImageRead(i->first,false);
293             return;
294           }
295         /// Already requested : change the priority
296         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
297         pitl->SetPriority(priority);
298         // Already in queue
299         if (pitl->Index()>=0) 
300           {
301             // Re-sort the queue
302             mQueue.upsort(pitl->Index());
303           }
304         // Not read but not in queue = being read = ok
305         else 
306           {
307             
308           }
309       }
310     else 
311       {
312         // Never requested before or unloaded 
313         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
314         mImages[pitl] = 0;
315         mQueue.insert(pitl);
316       }
317   }
318   //=====================================================================
319
320   //=====================================================================
321   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
322   (const std::string& filename,
323    MultiThreadImageReaderUser::EventType e,
324    vtkImageData* image)
325   {
326     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
327         (filename == mRequestedFilename))
328       {
329         mRequestedImage = image;
330       }
331     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
332       {
333         mNumberOfThreadedReadersRunning++;
334         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
335       }
336     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
337       {
338         
339                  mNumberOfThreadedReadersRunning--;
340         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
341       }
342   }
343   //=====================================================================
344
345   //=====================================================================
346   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
347   {
348          // Start();
349     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
350     //           <<std::endl;
351     
352     do 
353       {
354         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
355                 
356         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
357         //             <<"') lock ok"
358         //               <<std::endl;
359     
360         //                if (mNumberOfThreadedReadersRunning==0)
361         //      if (mThreadedImageReaderList.size()==0)
362         if (true)
363           {
364             ImageToLoad itl(this,filename);
365             ImageMapType::iterator i = mImages.find(&itl);
366             if (i!=mImages.end())
367               {
368                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
369                 // Already inserted
370                 if (pitl->GetImage() != 0)
371                   {
372                     // Already read
373                     UpdateUnloadPriority(pitl,
374                                          GetMaximalPriorityWithoutLocking()+1);
375                     return pitl->GetImage();
376                   }
377               }
378             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
379             mImages[pitl] = 0;
380             pitl->SetImage(mReader->ReadImage(filename));
381             UpdateUnloadPriority(pitl,
382                                  GetMaximalPriorityWithoutLocking()+1);
383             return pitl->GetImage();
384           }
385
386         /*      
387         mRequestedFilename = filename;
388         mRequestedImage = 0;
389         ImageToLoad itl(this,filename);
390         ImageMapType::iterator i = mImages.find(&itl);
391         if (i!=mImages.end())
392           {
393             // Already inserted in queue
394             if (i->first->GetImage() != 0)
395               {
396                 // Already read : ok : return it 
397                 return i->first->GetImage();
398               }
399             /// Already requested : change the priority
400               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
401               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
402               pitl->SetUser( this );
403               // Already in queue
404               if (pitl->Index()>=0) 
405                 {
406                   // Re-sort the queue
407                   mQueue.upsort(pitl->Index());
408                 }
409               // Not read but not in queue = being read = ok
410               else 
411                 {
412                   pitl->SetUser( this );
413                 }
414           }
415         else 
416           {
417             
418             // Never requested before or unloaded 
419             ImageToLoadPtr pitl = 
420               new ImageToLoad(this,filename,
421                               GetMaximalPriorityWithoutLocking() + 1);
422             mImages[pitl] = 0;
423             mQueue.insert(pitl);
424           }
425         */
426       }
427     while (0);
428
429     //    std::cout << "Waiting..."<<std::endl;
430
431     /*
432     // Waiting that it is read
433     int n = 0;
434     do 
435       {
436         //      std::cout << n++ << std::endl;
437         wxMilliSleep(10);
438         do 
439           {
440             //      wxMutexLocker lock(mMutex);
441             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
442             if (mRequestedImage!=0) 
443               {
444                 return mRequestedImage;
445               } 
446           }
447         while (0);
448       }
449     while (true);
450     // 
451     */
452   }
453   //=====================================================================
454   
455   //=====================================================================
456   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
457                                                bool purge)
458   {
459     
460 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
461     //    std::cout << "this="<<this <<std::endl;
462     //    std::cout << "user="<<p->GetUser() <<std::endl;
463
464     if ( p->GetUser() == this ) 
465         {
466       GetMultiThreadImageReaderUserMutex().Unlock();
467         }
468
469     p->GetUser()->MultiThreadImageReaderSendEvent
470       (p->GetFilename(),
471        MultiThreadImageReaderUser::ImageLoaded,
472        p->GetImage());
473
474     /*
475       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
476       BUGGY : TO FIX 
477     */
478     if (!purge)  return;
479     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
480
481     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
482            
483     mUnloadQueue.insert(p);
484
485
486 //EED 2017-01-01 Migration VTK7
487 #if VTK_MAJOR_VERSION <= 5
488     p->GetImage()->UpdateInformation();
489     p->GetImage()->PropagateUpdateExtent();
490     long ImMem = p->GetImage()->GetEstimatedMemorySize();
491 #else
492         int ext[6];
493         int dim[3];
494         p->GetImage()->GetExtent(ext);
495         dim[0]          = ext[1]-ext[0]+1;
496         dim[1]          = ext[3]-ext[2]+1;
497         dim[2]          = ext[5]-ext[4]+1;
498     long ImMem  = dim[0]*dim[1]*dim[2]*p->GetImage()->GetScalarSize();;
499 #endif
500     mTotalMem += ImMem;
501
502     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
503     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
504
505     //  return;
506
507     while (mTotalMem > mTotalMemMax)
508       {
509         GimmickMessage(5,
510                        "   ! Exceeded max of "
511                        << mTotalMemMax << " Ko : unloading oldest image ... "
512                        << std::endl);
513         if ( mUnloadQueue.size() <= 1 ) 
514           {
515              GimmickMessage(5,
516                             "   Only one image : cannot load AND unload it !!"
517                             <<std::endl);
518             break; 
519             
520           }
521         ImageToLoadPtr unload = mUnloadQueue.remove_top();
522         MultiThreadImageReaderUser* user = unload->GetUser();
523
524         /*
525         if ((user!=0)&&(user!=this)) 
526           {
527             user->GetMultiThreadImageReaderUserMutex().Lock();
528           }
529         */
530
531         std::string filename = unload->GetFilename();
532
533         GimmickMessage(5,"'" << filename << "'" << std::endl);
534
535 //EED 2017-01-01 Migration VTK7
536 #if VTK_MAJOR_VERSION <= 5
537         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
538 #else
539         int ext[6];
540         int dim[3];
541         unload->GetImage()->GetExtent(ext);
542         dim[0]          = ext[1]-ext[0]+1;
543         dim[1]          = ext[3]-ext[2]+1;
544         dim[2]          = ext[5]-ext[4]+1;
545         mTotalMem -= dim[0]*dim[1]*dim[2]*unload->GetImage()->GetScalarSize();
546 #endif
547
548         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
549
550         if (user!=0) 
551           {
552             //      std::cout << "unlock..."<<std::endl;
553             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
554             //      std::cout << "event"<<std::endl;
555             user->MultiThreadImageReaderSendEvent
556               (filename,
557                MultiThreadImageReaderUser::ImageUnloaded,
558                0);
559             //      std::cout << "event ok"<<std::endl;
560           }     
561
562         if (unload->Index()>=0)
563           {
564             // GimmickMessage(5,"still in queue"<<std::endl);
565           }
566         unload->Index() = -1;
567
568
569         ImageMapType::iterator it = mImages.find(unload);
570         if (it!=mImages.end())
571           {
572             mImages.erase(it);
573           }
574         //          std::cout << "delete..."<<std::endl;
575         delete unload;
576         //          std::cout << "delete ok."<<std::endl;
577
578       }
579   }
580   //=====================================================================
581
582   //=====================================================================
583   int MultiThreadImageReader::GetMaximalPriority()
584   { 
585     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
586     return GetMaximalPriorityWithoutLocking();
587   }
588   //=====================================================================
589
590
591   //=====================================================================
592   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
593   { 
594     long max = 0;
595     if (mQueue.size()>0) 
596       {
597         max = mQueue.top()->GetPriority();
598       }
599     if (mUnloadQueue.size()>0)
600       {
601         int max2 = mUnloadQueue.top()->GetPriority();
602         if (max2>max) max=max2;
603       }
604     return max;
605   }
606   //=====================================================================
607
608
609   //=====================================================================
610   //=====================================================================
611   //=====================================================================
612   //=====================================================================
613
614   //=====================================================================
615   void*  ThreadedImageReader::Entry()
616   {
617     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
618     //                << std::endl;
619
620     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
621       ("",
622        MultiThreadImageReaderUser::ThreadedReaderStarted,
623        0);
624
625     // While was not deleted 
626     while (!TestDestroy())
627       {
628                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
629           
630         // Lock the mutex
631         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
632         //mMutex.Lock();
633         // If image in queue
634         if (mMultiThreadImageReader->mQueue.size()>0)
635           {
636             MultiThreadImageReader::ImageToLoadPtr i = 
637               mMultiThreadImageReader->mQueue.remove_top();
638
639             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
640             //mMutex.Unlock();
641
642             
643             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
644             //                << i->GetFilename() << "'" << std::endl;
645             
646             // Do the job
647             vtkImageData* im = Read(i->GetFilename());
648
649             // Store it in the map
650             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
651             //mMutex.Lock();
652             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
653             MultiThreadImageReader::ImageMapType::iterator it = 
654               mMultiThreadImageReader->mImages.find(&itl);
655             MultiThreadImageReader::ImageToLoadPtr 
656               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
657               (it->first);
658             pitl->SetImage(im);
659             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
660             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
661             
662             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
663             //                << i->GetFilename() << "' : DONE" << std::endl;
664             
665           }
666         else 
667           {
668             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
669             //mMutex.Unlock();
670             // Wait a little to avoid blocking 
671             Sleep(10);
672           }
673       };
674     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
675     //                << std::endl;
676        
677     return 0;
678   }
679   //=====================================================================
680
681   //=====================================================================
682   void ThreadedImageReader::OnExit()
683   {
684     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
685       ("",
686        MultiThreadImageReaderUser::ThreadedReaderStopped,
687        0);
688   }
689   //=====================================================================
690
691   //=====================================================================
692   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
693   {
694     return mReader.ReadImage(filename);
695   }
696   //=====================================================================
697
698 } // namespace creaImageIO