]> Creatis software - creaImageIO.git/blob - src/creaImageIOQMultiThreadImageReader.cpp
c0a9f4d97008377d2255050ddda7dccfc2f88f06
[creaImageIO.git] / src / creaImageIOQMultiThreadImageReader.cpp
1 #include <creaImageIOQMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 //#include <wx/utils.h>
4 #include <QThread>
5 #include <creaImageIOSystem.h>
6
7 #include <creaImageIOGimmick.h>
8 #ifdef _DEBUG
9 #define new DEBUG_NEW
10 #endif
11 namespace creaImageIO
12 {
13
14   //=====================================================================
15   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
16   ( const std::string& filename,
17     EventType type,
18     vtkImageData* image)
19   {
20     QMutexLocker lock(&mMultiThreadImageReaderUserMutex);
21
22     this->OnMultiThreadImageReaderEvent(filename,type,image);
23   }
24   //=====================================================================
25
26   //=====================================================================
27   class ThreadedImageReader: public QThread
28   {
29   public:
30     ThreadedImageReader(MultiThreadImageReader* tir) :
31       mMultiThreadImageReader(tir)
32     {}
33
34     void* Entry();
35     void  OnExit();
36
37     vtkImageData* Read(const std::string& filename);
38     
39         struct deleter
40         {
41                 void operator()(ThreadedImageReader* p)
42                 {
43                         delete p;
44                 }
45         };
46         friend struct deleter;
47
48
49   private:
50     ImageReader mReader;
51     MultiThreadImageReader* mMultiThreadImageReader;
52         
53   };
54
55   //=====================================================================
56
57   
58   //=====================================================================
59   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
60     : //mDoNotSignal(false),
61       mReader(0),
62       mTotalMem(0),
63       mTotalMemMax(1000000)
64   {
65     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
66     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
67
68           mDone = false;
69     // Create the threads
70     for (int i=0; i<number_of_threads; i++) 
71       {
72                   //ThreadedImageReader* t = new ThreadedImageReader(this);
73                   boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this), ThreadedImageReader::deleter());
74         mThreadedImageReaderList.push_back(t);
75          std::cout << "  ===> Thread "<<i
76                       <<" successfully added"<< std::endl;
77       }
78     mNumberOfThreadedReadersRunning = 0;
79     // Init the queue
80     mQueue.set(mComparator);
81     mQueue.set(mIndexer);
82     // 
83     // no thread : alloc self reader
84 //    if (number_of_threads==0)
85 //      {
86         mReader = new ImageReader();
87 //      }
88   }
89   //=====================================================================
90
91
92   //=====================================================================
93   bool MultiThreadImageReader::Start()
94   {
95
96     //    std::cout << "#### MultiThreadImageReader::Start()"
97     //                <<std::endl;
98           if (mNumberOfThreadedReadersRunning > 0) return true;
99           
100     ThreadedImageReaderListType::iterator i;
101     for (i =mThreadedImageReaderList.begin();
102          i!=mThreadedImageReaderList.end();
103          i++)
104       {
105         (*i)->start();
106         if ( !(*i)->isRunning() )
107           {
108             std::cout << "ERROR starting a thread"<< std::endl;
109             return false;
110           }
111         else 
112           {
113                     std::cout << "  ===> Thread "<<(*i)->currentThreadId()
114                               <<" successfully created"<< std::endl;
115             
116           }
117       }
118     QMutexLocker locker(GetMultiThreadImageReaderUserMutex());
119     //    std::cout << "EO Start : #Threads running = "
120     //                << mNumberOfThreadedReadersRunning<<std::endl;
121
122     return true;
123   }
124   //=====================================================================
125
126   //=====================================================================
127   void MultiThreadImageReader::Stop()
128   { 
129 //                  std::cout << "#### MultiThreadImageReader::Stop()"
130 //            <<std::endl;
131   //  std::cout << "Sending stop order to the threads..."<<std::endl;
132           if (mDone) return;
133
134     ThreadedImageReaderListType::iterator i;
135     for (i =mThreadedImageReaderList.begin();
136          i!=mThreadedImageReaderList.end();
137          i++)
138       { std::cout << "  ===> Thread "<<(*i)->currentThreadId()
139                               <<" successfully stopped"<< std::endl;
140                   if(!(*i)->isFinished())
141                   {(*i)->wait();
142                           (*i).reset();
143                          //                       (*i)->Delete();
144                   }
145       }
146    mThreadedImageReaderList.clear();
147     // Wait a little to be sure that all threads have stopped
148     // A better way to do this ?
149     //    wxMilliSleep(1000);
150     // New method : the threads generate a stop event when they have finished
151     // We wait until all threads have stopped
152 //        std::cout << "Waiting for stop signals..."<<std::endl;
153     do 
154       {
155         // Sleep a little
156        //   QThread::msleep(10);
157         // Lock
158         {
159           QMutexLocker locker(GetMultiThreadImageReaderUserMutex());
160 //                std::cout << "#Threads running = "
161 //                          << mNumberOfThreadedReadersRunning<<std::endl;
162           // Break if all readers have stopped
163           if (mNumberOfThreadedReadersRunning <= 0) 
164             {
165               break;
166             }
167         }
168       } 
169     while (true);
170 //        std::cout << "All threads stopped : OK "<<std::endl;
171
172     ImageMapType::iterator j;
173     for (j =mImages.begin();
174          j!=mImages.end();
175          ++j)
176
177       {
178         delete j->first;
179       }
180     mImages.clear();
181         mDone = true;
182   }
183   //=====================================================================
184
185   //=====================================================================
186   MultiThreadImageReader::~MultiThreadImageReader()
187   {
188     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
189     //        <<std::endl;
190     Stop();
191     if (mReader) delete mReader;
192         mThreadedImageReaderList.clear();
193   }
194   //=====================================================================
195
196   //=====================================================================
197   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
198                                                     int priority)
199   {
200     // not in unload queue : ciao
201     if (p->UnloadIndex()<0) return;
202     int old_prio = p->GetPriority();
203     if (priority > old_prio) 
204       {
205         p->SetPriority(priority);
206         mUnloadQueue.downsort(p->UnloadIndex());
207       }
208     else if ( old_prio > priority )
209       {
210         p->SetPriority(priority);
211         mUnloadQueue.upsort(p->UnloadIndex());
212      }
213   }
214   //=====================================================================
215   // function to read attributes for a file
216   void MultiThreadImageReader::getAttributes(const std::string filename, 
217           std::map <std::string , std::string> &infos,std::vector<std::string> i_attr)
218   {
219           mReader->getAttributes(filename, infos, i_attr);
220   }
221
222   //=====================================================================
223   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
224                                         const std::string& filename, 
225                                         int priority )
226   {
227         QMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
228
229           if (mNumberOfThreadedReadersRunning==0)
230 //    if (mThreadedImageReaderList.size()==0) 
231       {
232         // no detached reader : use self reader
233         ImageToLoad itl(user,filename);
234         ImageMapType::iterator i = mImages.find(&itl);
235         if (i!=mImages.end())
236           {
237             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
238             // Already inserted
239             if (pitl->GetImage() != 0)
240               {
241                 // Already read
242                 pitl->SetUser(user);
243                 UpdateUnloadPriority(pitl,priority);
244                 SignalImageRead(pitl,false);
245                 return; // pitl->GetImage();
246               }
247           }
248         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
249         mImages[pitl] = 0;
250         pitl->SetImage(mReader->ReadImage(filename));
251         UpdateUnloadPriority(pitl,priority);
252         SignalImageRead(pitl,true);
253         //      return pitl->GetImage();
254         return;
255       }
256
257     ImageToLoad itl(user,filename);
258     ImageMapType::iterator i = mImages.find(&itl);
259     if (i!=mImages.end())
260       {
261         // Already inserted
262         if (i->first->GetImage() != 0)
263           {
264             // Already read : ok :signal the user
265             UpdateUnloadPriority(i->first,priority);
266             SignalImageRead(i->first,false);
267             return;
268           }
269         /// Already requested : change the priority
270         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
271         pitl->SetPriority(priority);
272         // Already in queue
273         if (pitl->Index()>=0) 
274           {
275             // Re-sort the queue
276             mQueue.upsort(pitl->Index());
277           }
278         // Not read but not in queue = being read = ok
279         else 
280           {
281             
282           }
283       }
284     else 
285       {
286         // Never requested before or unloaded 
287         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
288         mImages[pitl] = 0;
289         mQueue.insert(pitl);
290       }
291   }
292   //=====================================================================
293
294   //=====================================================================
295   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
296   (const std::string& filename,
297    MultiThreadImageReaderUser::EventType e,
298    vtkImageData* image)
299   {
300     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
301         (filename == mRequestedFilename))
302       {
303         mRequestedImage = image;
304       }
305     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
306       {
307         mNumberOfThreadedReadersRunning++;
308         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
309       }
310     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
311       {
312         
313                  mNumberOfThreadedReadersRunning--;
314         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
315       }
316   }
317   //=====================================================================
318
319   //=====================================================================
320   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
321   {
322          // Start();
323     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
324     //           <<std::endl;
325     
326     do 
327       {
328         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
329                 
330         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
331         //             <<"') lock ok"
332         //               <<std::endl;
333     
334         //                if (mNumberOfThreadedReadersRunning==0)
335         //      if (mThreadedImageReaderList.size()==0)
336         if (true)
337           {
338             ImageToLoad itl(this,filename);
339             ImageMapType::iterator i = mImages.find(&itl);
340             if (i!=mImages.end())
341               {
342                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
343                 // Already inserted
344                 if (pitl->GetImage() != 0)
345                   {
346                     // Already read
347                     UpdateUnloadPriority(pitl,
348                                          GetMaximalPriorityWithoutLocking()+1);
349                     return pitl->GetImage();
350                   }
351               }
352             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
353             mImages[pitl] = 0;
354             pitl->SetImage(mReader->ReadImage(filename));
355             UpdateUnloadPriority(pitl,
356                                  GetMaximalPriorityWithoutLocking()+1);
357             return pitl->GetImage();
358           }
359
360         /*      
361         mRequestedFilename = filename;
362         mRequestedImage = 0;
363         ImageToLoad itl(this,filename);
364         ImageMapType::iterator i = mImages.find(&itl);
365         if (i!=mImages.end())
366           {
367             // Already inserted in queue
368             if (i->first->GetImage() != 0)
369               {
370                 // Already read : ok : return it 
371                 return i->first->GetImage();
372               }
373             /// Already requested : change the priority
374               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
375               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
376               pitl->SetUser( this );
377               // Already in queue
378               if (pitl->Index()>=0) 
379                 {
380                   // Re-sort the queue
381                   mQueue.upsort(pitl->Index());
382                 }
383               // Not read but not in queue = being read = ok
384               else 
385                 {
386                   pitl->SetUser( this );
387                 }
388           }
389         else 
390           {
391             
392             // Never requested before or unloaded 
393             ImageToLoadPtr pitl = 
394               new ImageToLoad(this,filename,
395                               GetMaximalPriorityWithoutLocking() + 1);
396             mImages[pitl] = 0;
397             mQueue.insert(pitl);
398           }
399         */
400       }
401     while (0);
402
403     //    std::cout << "Waiting..."<<std::endl;
404
405     /*
406     // Waiting that it is read
407     int n = 0;
408     do 
409       {
410         //      std::cout << n++ << std::endl;
411         wxMilliSleep(10);
412         do 
413           {
414             //      wxMutexLocker lock(mMutex);
415             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
416             if (mRequestedImage!=0) 
417               {
418                 return mRequestedImage;
419               } 
420           }
421         while (0);
422       }
423     while (true);
424     // 
425     */
426   }
427   //=====================================================================
428   
429   //=====================================================================
430   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
431                                                bool purge)
432   {
433     
434 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
435     //    std::cout << "this="<<this <<std::endl;
436     //    std::cout << "user="<<p->GetUser() <<std::endl;
437
438     if ( p->GetUser() == this ) 
439       GetMultiThreadImageReaderUserMutex()->unlock();
440
441     p->GetUser()->MultiThreadImageReaderSendEvent
442       (p->GetFilename(),
443        MultiThreadImageReaderUser::ImageLoaded,
444        p->GetImage());
445
446     /*
447       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
448       BUGGY : TO FIX 
449     */
450     if (!purge)  return;
451     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
452
453     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
454            
455     mUnloadQueue.insert(p);
456     p->GetImage()->UpdateInformation();
457     p->GetImage()->PropagateUpdateExtent();
458     long ImMem = p->GetImage()->GetEstimatedMemorySize();
459     mTotalMem += ImMem;
460
461     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
462     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
463
464     //  return;
465
466     while (mTotalMem > mTotalMemMax)
467       {
468         GimmickMessage(5,
469                        "   ! Exceeded max of "
470                        << mTotalMemMax << " Ko : unloading oldest image ... "
471                        << std::endl);
472         if ( mUnloadQueue.size() <= 1 ) 
473           {
474              GimmickMessage(5,
475                             "   Only one image : cannot load AND unload it !!"
476                             <<std::endl);
477             break; 
478             
479           }
480         ImageToLoadPtr unload = mUnloadQueue.remove_top();
481         MultiThreadImageReaderUser* user = unload->GetUser();
482
483         /*
484         if ((user!=0)&&(user!=this)) 
485           {
486             user->GetMultiThreadImageReaderUserMutex().Lock();
487           }
488         */
489
490         std::string filename = unload->GetFilename();
491
492         GimmickMessage(5,"'" << filename << "'" << std::endl);
493         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
494
495         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
496
497         if (user!=0) 
498           {
499             //      std::cout << "unlock..."<<std::endl;
500             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
501             //      std::cout << "event"<<std::endl;
502             user->MultiThreadImageReaderSendEvent
503               (filename,
504                MultiThreadImageReaderUser::ImageUnloaded,
505                0);
506             //      std::cout << "event ok"<<std::endl;
507           }     
508
509         if (unload->Index()>=0)
510           {
511             // GimmickMessage(5,"still in queue"<<std::endl);
512           }
513         unload->Index() = -1;
514
515
516         ImageMapType::iterator it = mImages.find(unload);
517         if (it!=mImages.end())
518           {
519             mImages.erase(it);
520           }
521         //          std::cout << "delete..."<<std::endl;
522         delete unload;
523         //          std::cout << "delete ok."<<std::endl;
524
525       }
526   }
527   //=====================================================================
528
529   //=====================================================================
530   int MultiThreadImageReader::GetMaximalPriority()
531   { 
532     QMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
533     return GetMaximalPriorityWithoutLocking();
534   }
535   //=====================================================================
536
537
538   //=====================================================================
539   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
540   { 
541     long max = 0;
542     if (mQueue.size()>0) 
543       {
544         max = mQueue.top()->GetPriority();
545       }
546     if (mUnloadQueue.size()>0)
547       {
548         int max2 = mUnloadQueue.top()->GetPriority();
549         if (max2>max) max=max2;
550       }
551     return max;
552   }
553   //=====================================================================
554
555
556   //=====================================================================
557   //=====================================================================
558   //=====================================================================
559   //=====================================================================
560
561   //=====================================================================
562   void*  ThreadedImageReader::Entry()
563   {
564     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
565     //                << std::endl;
566
567     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
568       ("",
569        MultiThreadImageReaderUser::ThreadedReaderStarted,
570        0);
571
572     // While was not deleted 
573     while (!isFinished())
574       {
575                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
576           
577         // Lock the mutex
578         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
579         //mMutex.Lock();
580         // If image in queue
581         if (mMultiThreadImageReader->mQueue.size()>0)
582           {
583             MultiThreadImageReader::ImageToLoadPtr i = 
584               mMultiThreadImageReader->mQueue.remove_top();
585
586             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
587             //mMutex.Unlock();
588
589             
590             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
591             //                << i->GetFilename() << "'" << std::endl;
592             
593             // Do the job
594             vtkImageData* im = Read(i->GetFilename());
595
596             // Store it in the map
597             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
598             //mMutex.Lock();
599             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
600             MultiThreadImageReader::ImageMapType::iterator it = 
601               mMultiThreadImageReader->mImages.find(&itl);
602             MultiThreadImageReader::ImageToLoadPtr 
603               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
604               (it->first);
605             pitl->SetImage(im);
606             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
607             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
608             
609             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
610             //                << i->GetFilename() << "' : DONE" << std::endl;
611             
612           }
613         else 
614           {
615             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
616             //mMutex.Unlock();
617             // Wait a little to avoid blocking 
618             Sleep(10);
619           }
620       };
621     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
622     //                << std::endl;
623        
624     return 0;
625   }
626   //=====================================================================
627
628   //=====================================================================
629   void ThreadedImageReader::OnExit()
630   {
631     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
632       ("",
633        MultiThreadImageReaderUser::ThreadedReaderStopped,
634        0);
635   }
636   //=====================================================================
637
638   //=====================================================================
639   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
640   {
641     return mReader.ReadImage(filename);
642   }
643   //=====================================================================
644
645 } // namespace creaImageIO