]> Creatis software - creaImageIO.git/blob - src2/creaImageIOMultiThreadImageReader.cpp
clean-up
[creaImageIO.git] / src2 / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6
7 namespace creaImageIO
8 {
9
10   //=====================================================================
11   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
12   ( const std::string& filename,
13     EventType type,
14     vtkImageData* image)
15   {
16     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
17
18     this->OnMultiThreadImageReaderEvent(filename,type,image);
19   }
20   //=====================================================================
21
22   //=====================================================================
23   class ThreadedImageReader: public wxThread
24   {
25   public:
26     ThreadedImageReader(MultiThreadImageReader* tir) :
27       mMultiThreadImageReader(tir)
28     {}
29
30     void* Entry();
31     void  OnExit();
32
33     vtkImageData* Read(const std::string& filename);
34     
35         struct deleter
36         {
37                 void operator()(ThreadedImageReader* p)
38                 {
39                         p->Delete();
40                 }
41         };
42         friend struct deleter;
43
44
45   private:
46     ImageReader mReader;
47     MultiThreadImageReader* mMultiThreadImageReader;
48         
49   };
50
51   //=====================================================================
52
53   
54   //=====================================================================
55   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
56     : //mDoNotSignal(false),
57       mReader(0),
58       mTotalMem(0),
59       mTotalMemMax(1000000)
60   {
61     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
62     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
63
64     // Create the threads
65     for (int i=0; i<number_of_threads; i++) 
66       {
67                   //ThreadedImageReader* t = new ThreadedImageReader(this);
68                   boost::shared_ptr<ThreadedImageReader> t(new ThreadedImageReader(this), ThreadedImageReader::deleter());
69         mThreadedImageReaderList.push_back(t);
70          std::cout << "  ===> Thread "<<i
71                       <<" successfully added"<< std::endl;
72       }
73     mNumberOfThreadedReadersRunning = 0;
74     // Init the queue
75     mQueue.set(mComparator);
76     mQueue.set(mIndexer);
77     // 
78     // no thread : alloc self reader
79 //    if (number_of_threads==0)
80 //      {
81         mReader = new ImageReader();
82 //      }
83   }
84   //=====================================================================
85
86
87   //=====================================================================
88   bool MultiThreadImageReader::Start()
89   {
90
91     //    std::cout << "#### MultiThreadImageReader::Start()"
92     //                <<std::endl;
93           if (mNumberOfThreadedReadersRunning > 0) return true;
94           
95     ThreadedImageReaderListType::iterator i;
96     for (i =mThreadedImageReaderList.begin();
97          i!=mThreadedImageReaderList.end();
98          i++)
99       {
100         (*i)->Create();
101         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
102           {
103             std::cout << "ERROR starting a thread"<< std::endl;
104             return false;
105           }
106         else 
107           {
108                     std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
109                               <<" successfully created"<< std::endl;
110             
111           }
112       }
113     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
114     //    std::cout << "EO Start : #Threads running = "
115     //                << mNumberOfThreadedReadersRunning<<std::endl;
116
117     return true;
118   }
119   //=====================================================================
120
121   //=====================================================================
122   void MultiThreadImageReader::Stop()
123   { 
124 //                  std::cout << "#### MultiThreadImageReader::Stop()"
125 //            <<std::endl;
126   //  std::cout << "Sending stop order to the threads..."<<std::endl;
127
128     ThreadedImageReaderListType::iterator i;
129     for (i =mThreadedImageReaderList.begin();
130          i!=mThreadedImageReaderList.end();
131          i++)
132       { std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
133                               <<" successfully stopped"<< std::endl;
134                   if((*i)->IsAlive())
135                   {
136                           (*i).reset();
137                          //(*i)->Delete();
138                   }
139       }
140  //  mThreadedImageReaderList.clear();
141     // Wait a little to be sure that all threads have stopped
142     // A better way to do this ?
143     //    wxMilliSleep(1000);
144     // New method : the threads generate a stop event when they have finished
145     // We wait until all threads have stopped
146 //        std::cout << "Waiting for stop signals..."<<std::endl;
147     do 
148       {
149         // Sleep a little
150                 wxMilliSleep(10);
151         // Lock
152         {
153           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
154 //                std::cout << "#Threads running = "
155 //                          << mNumberOfThreadedReadersRunning<<std::endl;
156           // Break if all readers have stopped
157           if (mNumberOfThreadedReadersRunning <= 0) 
158             {
159               break;
160             }
161         }
162       } 
163     while (true);
164 //        std::cout << "All threads stopped : OK "<<std::endl;
165
166     ImageMapType::iterator j;
167     for (j =mImages.begin();
168          j!=mImages.end();
169          ++j)
170
171       {
172         delete j->first;
173       }
174     mImages.clear();
175   }
176   //=====================================================================
177
178   //=====================================================================
179   MultiThreadImageReader::~MultiThreadImageReader()
180   {
181     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
182     //        <<std::endl;
183     Stop();
184     if (mReader) delete mReader;
185         mThreadedImageReaderList.clear();
186   }
187   //=====================================================================
188
189   //=====================================================================
190   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
191                                                     int priority)
192   {
193     // not in unload queue : ciao
194     if (p->UnloadIndex()<0) return;
195     int old_prio = p->GetPriority();
196     if (priority > old_prio) 
197       {
198         p->SetPriority(priority);
199         mUnloadQueue.downsort(p->UnloadIndex());
200       }
201     else if ( old_prio > priority )
202       {
203         p->SetPriority(priority);
204         mUnloadQueue.upsort(p->UnloadIndex());
205      }
206   }
207   //=====================================================================
208
209   //=====================================================================
210   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
211                                         const std::string& filename, 
212                                         int priority )
213   {
214         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
215
216           if (mNumberOfThreadedReadersRunning==0)
217 //    if (mThreadedImageReaderList.size()==0) 
218       {
219         // no detached reader : use self reader
220         ImageToLoad itl(user,filename);
221         ImageMapType::iterator i = mImages.find(&itl);
222         if (i!=mImages.end())
223           {
224             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
225             // Already inserted
226             if (pitl->GetImage() != 0)
227               {
228                 // Already read
229                 pitl->SetUser(user);
230                 UpdateUnloadPriority(pitl,priority);
231                 SignalImageRead(pitl,false);
232                 return; // pitl->GetImage();
233               }
234           }
235         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
236         mImages[pitl] = 0;
237         pitl->SetImage(mReader->ReadImage(filename));
238         UpdateUnloadPriority(pitl,priority);
239         SignalImageRead(pitl,true);
240         //      return pitl->GetImage();
241         return;
242       }
243
244     ImageToLoad itl(user,filename);
245     ImageMapType::iterator i = mImages.find(&itl);
246     if (i!=mImages.end())
247       {
248         // Already inserted
249         if (i->first->GetImage() != 0)
250           {
251             // Already read : ok :signal the user
252             UpdateUnloadPriority(i->first,priority);
253             SignalImageRead(i->first,false);
254             return;
255           }
256         /// Already requested : change the priority
257         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
258         pitl->SetPriority(priority);
259         // Already in queue
260         if (pitl->Index()>=0) 
261           {
262             // Re-sort the queue
263             mQueue.upsort(pitl->Index());
264           }
265         // Not read but not in queue = being read = ok
266         else 
267           {
268             
269           }
270       }
271     else 
272       {
273         // Never requested before or unloaded 
274         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
275         mImages[pitl] = 0;
276         mQueue.insert(pitl);
277       }
278   }
279   //=====================================================================
280
281   //=====================================================================
282   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
283   (const std::string& filename,
284    MultiThreadImageReaderUser::EventType e,
285    vtkImageData* image)
286   {
287     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
288         (filename == mRequestedFilename))
289       {
290         mRequestedImage = image;
291       }
292     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
293       {
294         mNumberOfThreadedReadersRunning++;
295         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
296       }
297     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
298       {
299         
300                  mNumberOfThreadedReadersRunning--;
301         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
302       }
303   }
304   //=====================================================================
305
306   //=====================================================================
307   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
308   {
309          // Start();
310     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
311     //           <<std::endl;
312     
313     do 
314       {
315         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
316                 
317         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
318         //             <<"') lock ok"
319         //               <<std::endl;
320     
321         //                if (mNumberOfThreadedReadersRunning==0)
322         //      if (mThreadedImageReaderList.size()==0)
323         if (true)
324           {
325             ImageToLoad itl(this,filename);
326             ImageMapType::iterator i = mImages.find(&itl);
327             if (i!=mImages.end())
328               {
329                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
330                 // Already inserted
331                 if (pitl->GetImage() != 0)
332                   {
333                     // Already read
334                     UpdateUnloadPriority(pitl,
335                                          GetMaximalPriorityWithoutLocking()+1);
336                     return pitl->GetImage();
337                   }
338               }
339             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
340             mImages[pitl] = 0;
341             pitl->SetImage(mReader->ReadImage(filename));
342             UpdateUnloadPriority(pitl,
343                                  GetMaximalPriorityWithoutLocking()+1);
344             return pitl->GetImage();
345           }
346
347         /*      
348         mRequestedFilename = filename;
349         mRequestedImage = 0;
350         ImageToLoad itl(this,filename);
351         ImageMapType::iterator i = mImages.find(&itl);
352         if (i!=mImages.end())
353           {
354             // Already inserted in queue
355             if (i->first->GetImage() != 0)
356               {
357                 // Already read : ok : return it 
358                 return i->first->GetImage();
359               }
360             /// Already requested : change the priority
361               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
362               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
363               pitl->SetUser( this );
364               // Already in queue
365               if (pitl->Index()>=0) 
366                 {
367                   // Re-sort the queue
368                   mQueue.upsort(pitl->Index());
369                 }
370               // Not read but not in queue = being read = ok
371               else 
372                 {
373                   pitl->SetUser( this );
374                 }
375           }
376         else 
377           {
378             
379             // Never requested before or unloaded 
380             ImageToLoadPtr pitl = 
381               new ImageToLoad(this,filename,
382                               GetMaximalPriorityWithoutLocking() + 1);
383             mImages[pitl] = 0;
384             mQueue.insert(pitl);
385           }
386         */
387       }
388     while (0);
389
390     //    std::cout << "Waiting..."<<std::endl;
391
392     /*
393     // Waiting that it is read
394     int n = 0;
395     do 
396       {
397         //      std::cout << n++ << std::endl;
398         wxMilliSleep(10);
399         do 
400           {
401             //      wxMutexLocker lock(mMutex);
402             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
403             if (mRequestedImage!=0) 
404               {
405                 return mRequestedImage;
406               } 
407           }
408         while (0);
409       }
410     while (true);
411     // 
412     */
413   }
414   //=====================================================================
415   
416   //=====================================================================
417   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
418                                                bool purge)
419   {
420     
421 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
422     //    std::cout << "this="<<this <<std::endl;
423     //    std::cout << "user="<<p->GetUser() <<std::endl;
424
425     if ( p->GetUser() == this ) 
426       GetMultiThreadImageReaderUserMutex().Unlock();
427
428     p->GetUser()->MultiThreadImageReaderSendEvent
429       (p->GetFilename(),
430        MultiThreadImageReaderUser::ImageLoaded,
431        p->GetImage());
432
433     /*
434       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
435       BUGGY : TO FIX 
436     */
437     if (!purge)  return;
438     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
439
440     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
441            
442     mUnloadQueue.insert(p);
443     p->GetImage()->UpdateInformation();
444     p->GetImage()->PropagateUpdateExtent();
445     long ImMem = p->GetImage()->GetEstimatedMemorySize();
446     mTotalMem += ImMem;
447
448     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
449     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
450
451     //  return;
452
453     while (mTotalMem > mTotalMemMax)
454       {
455         GimmickMessage(5,
456                        "   ! Exceeded max of "
457                        << mTotalMemMax << " Ko : unloading oldest image ... "
458                        << std::endl);
459         if ( mUnloadQueue.size() <= 1 ) 
460           {
461              GimmickMessage(5,
462                             "   Only one image : cannot load AND unload it !!"
463                             <<std::endl);
464             break; 
465             
466           }
467         ImageToLoadPtr unload = mUnloadQueue.remove_top();
468         MultiThreadImageReaderUser* user = unload->GetUser();
469
470         /*
471         if ((user!=0)&&(user!=this)) 
472           {
473             user->GetMultiThreadImageReaderUserMutex().Lock();
474           }
475         */
476
477         std::string filename = unload->GetFilename();
478
479         GimmickMessage(5,"'" << filename << "'" << std::endl);
480         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
481
482         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
483
484         if (user!=0) 
485           {
486             //      std::cout << "unlock..."<<std::endl;
487             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
488             //      std::cout << "event"<<std::endl;
489             user->MultiThreadImageReaderSendEvent
490               (filename,
491                MultiThreadImageReaderUser::ImageUnloaded,
492                0);
493             //      std::cout << "event ok"<<std::endl;
494           }     
495
496         if (unload->Index()>=0)
497           {
498             // GimmickMessage(5,"still in queue"<<std::endl);
499           }
500         unload->Index() = -1;
501
502
503         ImageMapType::iterator it = mImages.find(unload);
504         if (it!=mImages.end())
505           {
506             mImages.erase(it);
507           }
508         //          std::cout << "delete..."<<std::endl;
509         delete unload;
510         //          std::cout << "delete ok."<<std::endl;
511
512       }
513   }
514   //=====================================================================
515
516   //=====================================================================
517   int MultiThreadImageReader::GetMaximalPriority()
518   { 
519     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
520     return GetMaximalPriorityWithoutLocking();
521   }
522   //=====================================================================
523
524
525   //=====================================================================
526   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
527   { 
528     long max = 0;
529     if (mQueue.size()>0) 
530       {
531         max = mQueue.top()->GetPriority();
532       }
533     if (mUnloadQueue.size()>0)
534       {
535         int max2 = mUnloadQueue.top()->GetPriority();
536         if (max2>max) max=max2;
537       }
538     return max;
539   }
540   //=====================================================================
541
542
543   //=====================================================================
544   //=====================================================================
545   //=====================================================================
546   //=====================================================================
547
548   //=====================================================================
549   void*  ThreadedImageReader::Entry()
550   {
551     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
552     //                << std::endl;
553
554     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
555       ("",
556        MultiThreadImageReaderUser::ThreadedReaderStarted,
557        0);
558
559     // While was not deleted 
560     while (!TestDestroy())
561       {
562                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
563           
564         // Lock the mutex
565         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
566         //mMutex.Lock();
567         // If image in queue
568         if (mMultiThreadImageReader->mQueue.size()>0)
569           {
570             MultiThreadImageReader::ImageToLoadPtr i = 
571               mMultiThreadImageReader->mQueue.remove_top();
572
573             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
574             //mMutex.Unlock();
575
576             
577             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
578             //                << i->GetFilename() << "'" << std::endl;
579             
580             // Do the job
581             vtkImageData* im = Read(i->GetFilename());
582
583             // Store it in the map
584             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
585             //mMutex.Lock();
586             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
587             MultiThreadImageReader::ImageMapType::iterator it = 
588               mMultiThreadImageReader->mImages.find(&itl);
589             MultiThreadImageReader::ImageToLoadPtr 
590               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
591               (it->first);
592             pitl->SetImage(im);
593             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
594             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
595             
596             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
597             //                << i->GetFilename() << "' : DONE" << std::endl;
598             
599           }
600         else 
601           {
602             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
603             //mMutex.Unlock();
604             // Wait a little to avoid blocking 
605             Sleep(10);
606           }
607       };
608     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
609     //                << std::endl;
610        
611     return 0;
612   }
613   //=====================================================================
614
615   //=====================================================================
616   void ThreadedImageReader::OnExit()
617   {
618     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
619       ("",
620        MultiThreadImageReaderUser::ThreadedReaderStopped,
621        0);
622   }
623   //=====================================================================
624
625   //=====================================================================
626   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
627   {
628     return mReader.ReadImage(filename);
629   }
630   //=====================================================================
631
632 } // namespace creaImageIO