]> Creatis software - creaImageIO.git/blob - src2/creaImageIOMultiThreadImageReader.cpp
3f4ba3ab6413f38c722ab51e41fe848cad3141e3
[creaImageIO.git] / src2 / creaImageIOMultiThreadImageReader.cpp
1 #include <creaImageIOMultiThreadImageReader.h>
2 #include <creaImageIOImageReader.h>
3 #include <wx/utils.h>
4 #include <creaImageIOSystem.h>
5
6 namespace creaImageIO
7 {
8
9   //=====================================================================
10   void MultiThreadImageReaderUser::MultiThreadImageReaderSendEvent
11   ( const std::string& filename,
12     EventType type,
13     vtkImageData* image)
14   {
15     wxMutexLocker lock(mMultiThreadImageReaderUserMutex);
16
17     this->OnMultiThreadImageReaderEvent(filename,type,image);
18   }
19   //=====================================================================
20
21   //=====================================================================
22   class ThreadedImageReader: public wxThread
23   {
24   public:
25     ThreadedImageReader(MultiThreadImageReader* tir) :
26       mMultiThreadImageReader(tir)
27     {}
28
29     void* Entry();
30     void  OnExit();
31
32     vtkImageData* Read(const std::string& filename);
33     
34
35   private:
36     ImageReader mReader;
37     MultiThreadImageReader* mMultiThreadImageReader;
38         
39   };
40   //=====================================================================
41
42   
43   //=====================================================================
44   MultiThreadImageReader::MultiThreadImageReader(int number_of_threads)
45     : //mDoNotSignal(false),
46       mReader(0),
47       mTotalMem(0),
48       mTotalMemMax(1000000)
49   {
50     //    std::cout << "#### MultiThreadImageReader::MultiThreadImageReader("
51     //        << " #threads= " << number_of_threads <<" )"<<std::endl;
52
53     // Create the threads
54     for (int i=0; i<number_of_threads; i++) 
55       {
56         ThreadedImageReader* t = new ThreadedImageReader(this);
57         mThreadedImageReaderList.push_back(t);
58          std::cout << "  ===> Thread "<<i
59                               <<" successfully added"<< std::endl;
60       }
61     mNumberOfThreadedReadersRunning = 0;
62     // Init the queue
63     mQueue.set(mComparator);
64     mQueue.set(mIndexer);
65     // 
66     // no thread : alloc self reader
67 //    if (number_of_threads==0)
68 //      {
69         mReader = new ImageReader();
70 //      }
71   }
72   //=====================================================================
73
74
75   //=====================================================================
76   bool MultiThreadImageReader::Start()
77   {
78
79     //    std::cout << "#### MultiThreadImageReader::Start()"
80     //                <<std::endl;
81           if (mNumberOfThreadedReadersRunning > 0) return true;
82           
83     ThreadedImageReaderListType::iterator i;
84     for (i =mThreadedImageReaderList.begin();
85          i!=mThreadedImageReaderList.end();
86          i++)
87       {
88         (*i)->Create();
89         if ( (*i)->Run() != wxTHREAD_NO_ERROR )
90           {
91             std::cout << "ERROR starting a thread"<< std::endl;
92             return false;
93           }
94         else 
95           {
96                     std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
97                               <<" successfully created"<< std::endl;
98             
99           }
100       }
101     wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
102     //    std::cout << "EO Start : #Threads running = "
103     //                << mNumberOfThreadedReadersRunning<<std::endl;
104
105     return true;
106   }
107   //=====================================================================
108
109   //=====================================================================
110   void MultiThreadImageReader::Stop()
111   { 
112 //                  std::cout << "#### MultiThreadImageReader::Stop()"
113 //            <<std::endl;
114   //  std::cout << "Sending stop order to the threads..."<<std::endl;
115
116     ThreadedImageReaderListType::iterator i;
117     for (i =mThreadedImageReaderList.begin();
118          i!=mThreadedImageReaderList.end();
119          i++)
120       { std::cout << "  ===> Thread "<<(*i)->GetCurrentId()
121                               <<" successfully stopped"<< std::endl;
122                   if((*i)->IsAlive())
123                   {
124                         (*i)->Delete();
125                   }
126       }
127     mThreadedImageReaderList.clear();
128     // Wait a little to be sure that all threads have stopped
129     // A better way to do this ?
130     //    wxMilliSleep(1000);
131     // New method : the threads generate a stop event when they have finished
132     // We wait until all threads have stopped
133 //        std::cout << "Waiting for stop signals..."<<std::endl;
134     do 
135       {
136         // Sleep a little
137                 wxMilliSleep(10);
138         // Lock
139         {
140           wxMutexLocker locker(GetMultiThreadImageReaderUserMutex());
141 //                std::cout << "#Threads running = "
142 //                          << mNumberOfThreadedReadersRunning<<std::endl;
143           // Break if all readers have stopped
144           if (mNumberOfThreadedReadersRunning <= 0) 
145             {
146               break;
147             }
148         }
149       } 
150     while (true);
151 //        std::cout << "All threads stopped : OK "<<std::endl;
152
153     ImageMapType::iterator j;
154     for (j =mImages.begin();
155          j!=mImages.end();
156          ++j)
157
158       {
159         delete j->first;
160       }
161     mImages.clear();
162   }
163   //=====================================================================
164
165   //=====================================================================
166   MultiThreadImageReader::~MultiThreadImageReader()
167   {
168     //    std::cout << "#### MultiThreadImageReader::~MultiThreadImageReader()"
169     //        <<std::endl;
170     Stop();
171     if (mReader) delete mReader;
172   }
173   //=====================================================================
174
175   //=====================================================================
176   void MultiThreadImageReader::UpdateUnloadPriority(ImageToLoadPtr p, 
177                                                     int priority)
178   {
179     // not in unload queue : ciao
180     if (p->UnloadIndex()<0) return;
181     int old_prio = p->GetPriority();
182     if (priority > old_prio) 
183       {
184         p->SetPriority(priority);
185         mUnloadQueue.downsort(p->UnloadIndex());
186       }
187     else if ( old_prio > priority )
188       {
189         p->SetPriority(priority);
190         mUnloadQueue.upsort(p->UnloadIndex());
191      }
192   }
193   //=====================================================================
194
195   //=====================================================================
196   void MultiThreadImageReader::Request( MultiThreadImageReaderUser* user,
197                                         const std::string& filename, 
198                                         int priority )
199   {
200         wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
201     
202           if (mNumberOfThreadedReadersRunning==0)
203 //    if (mThreadedImageReaderList.size()==0) 
204       {
205         // no detached reader : use self reader
206         ImageToLoad itl(user,filename);
207         ImageMapType::iterator i = mImages.find(&itl);
208         if (i!=mImages.end())
209           {
210             ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
211             // Already inserted
212             if (pitl->GetImage() != 0)
213               {
214                 // Already read
215                 pitl->SetUser(user);
216                 UpdateUnloadPriority(pitl,priority);
217                 SignalImageRead(pitl,false);
218                 return; // pitl->GetImage();
219               }
220           }
221         ImageToLoadPtr pitl = new ImageToLoad(user,filename,0);
222         mImages[pitl] = 0;
223         pitl->SetImage(mReader->ReadImage(filename));
224         UpdateUnloadPriority(pitl,priority);
225         SignalImageRead(pitl,true);
226         //      return pitl->GetImage();
227         return;
228       }
229
230
231     ImageToLoad itl(user,filename);
232     ImageMapType::iterator i = mImages.find(&itl);
233     if (i!=mImages.end())
234       {
235         // Already inserted
236         if (i->first->GetImage() != 0)
237           {
238             // Already read : ok :signal the user
239             UpdateUnloadPriority(i->first,priority);
240             SignalImageRead(i->first,false);
241             return;
242           }
243         /// Already requested : change the priority
244         ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
245         pitl->SetPriority(priority);
246         // Already in queue
247         if (pitl->Index()>=0) 
248           {
249             // Re-sort the queue
250             mQueue.upsort(pitl->Index());
251           }
252         // Not read but not in queue = being read = ok
253         else 
254           {
255             
256           }
257       }
258     else 
259       {
260         // Never requested before or unloaded 
261         ImageToLoadPtr pitl = new ImageToLoad(user,filename,priority);
262         mImages[pitl] = 0;
263         mQueue.insert(pitl);
264       }
265   }
266   //=====================================================================
267
268   //=====================================================================
269   void MultiThreadImageReader::OnMultiThreadImageReaderEvent
270   (const std::string& filename,
271    MultiThreadImageReaderUser::EventType e,
272    vtkImageData* image)
273   {
274     if ((e==MultiThreadImageReaderUser::ImageLoaded) &&
275         (filename == mRequestedFilename))
276       {
277         mRequestedImage = image;
278       }
279     else if (e==MultiThreadImageReaderUser::ThreadedReaderStarted)
280       {
281         mNumberOfThreadedReadersRunning++;
282         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
283       }
284     else if (e==MultiThreadImageReaderUser::ThreadedReaderStopped)
285       {
286         
287                  mNumberOfThreadedReadersRunning--;
288         //      std::cout << "#TR=" << mNumberOfThreadedReadersRunning << std::endl;
289       }
290   }
291   //=====================================================================
292
293   //=====================================================================
294   vtkImageData* MultiThreadImageReader::GetImage(const std::string& filename)
295   {
296          // Start();
297     //       std::cout << "** MultiThreadImageReader::GetImage('"<<filename<<"')"
298     //           <<std::endl;
299     
300     do 
301       {
302         //      wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
303                 
304         //     std::cout << "** MultiThreadImageReader::GetImage('"<<filename
305         //             <<"') lock ok"
306         //               <<std::endl;
307     
308         //                if (mNumberOfThreadedReadersRunning==0)
309         //      if (mThreadedImageReaderList.size()==0)
310         if (true)
311           {
312             ImageToLoad itl(this,filename);
313             ImageMapType::iterator i = mImages.find(&itl);
314             if (i!=mImages.end())
315               {
316                 ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
317                 // Already inserted
318                 if (pitl->GetImage() != 0)
319                   {
320                     // Already read
321                     UpdateUnloadPriority(pitl,
322                                          GetMaximalPriorityWithoutLocking()+1);
323                     return pitl->GetImage();
324                   }
325               }
326             ImageToLoadPtr pitl = new ImageToLoad(this,filename,0);
327             mImages[pitl] = 0;
328             pitl->SetImage(mReader->ReadImage(filename));
329             UpdateUnloadPriority(pitl,
330                                  GetMaximalPriorityWithoutLocking()+1);
331             return pitl->GetImage();
332           }
333
334         /*      
335         mRequestedFilename = filename;
336         mRequestedImage = 0;
337         ImageToLoad itl(this,filename);
338         ImageMapType::iterator i = mImages.find(&itl);
339         if (i!=mImages.end())
340           {
341             // Already inserted in queue
342             if (i->first->GetImage() != 0)
343               {
344                 // Already read : ok : return it 
345                 return i->first->GetImage();
346               }
347             /// Already requested : change the priority
348               ImageToLoadPtr pitl = const_cast<ImageToLoadPtr>(i->first);
349               pitl->SetPriority( GetMaximalPriorityWithoutLocking() + 1 );
350               pitl->SetUser( this );
351               // Already in queue
352               if (pitl->Index()>=0) 
353                 {
354                   // Re-sort the queue
355                   mQueue.upsort(pitl->Index());
356                 }
357               // Not read but not in queue = being read = ok
358               else 
359                 {
360                   pitl->SetUser( this );
361                 }
362           }
363         else 
364           {
365             
366             // Never requested before or unloaded 
367             ImageToLoadPtr pitl = 
368               new ImageToLoad(this,filename,
369                               GetMaximalPriorityWithoutLocking() + 1);
370             mImages[pitl] = 0;
371             mQueue.insert(pitl);
372           }
373         */
374       }
375     while (0);
376
377     //    std::cout << "Waiting..."<<std::endl;
378
379     /*
380     // Waiting that it is read
381     int n = 0;
382     do 
383       {
384         //      std::cout << n++ << std::endl;
385         wxMilliSleep(10);
386         do 
387           {
388             //      wxMutexLocker lock(mMutex);
389             wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
390             if (mRequestedImage!=0) 
391               {
392                 return mRequestedImage;
393               } 
394           }
395         while (0);
396       }
397     while (true);
398     // 
399     */
400   }
401   //=====================================================================
402   
403   //=====================================================================
404   void MultiThreadImageReader::SignalImageRead(ImageToLoadPtr p, 
405                                                bool purge)
406   {
407     
408 //    std::cout << "MultiThreadImageReader::SignalImageRead" <<std::endl;
409     //    std::cout << "this="<<this <<std::endl;
410     //    std::cout << "user="<<p->GetUser() <<std::endl;
411
412     if ( p->GetUser() == this ) 
413       GetMultiThreadImageReaderUserMutex().Unlock();
414
415     p->GetUser()->MultiThreadImageReaderSendEvent
416       (p->GetFilename(),
417        MultiThreadImageReaderUser::ImageLoaded,
418        p->GetImage());
419
420     /*
421       AN ATTEMPT TO UNLOAD OLDEST IMAGE IF EXCEEDED A CERTAIN MEMORY QUOTA
422       BUGGY : TO FIX 
423     */
424     if (!purge)  return;
425     GimmickMessage(5,"Image '"<<p->GetFilename()<<"' read"<<std::endl);
426
427     //    wxMutexLocker lock(GetMultiThreadImageReaderUserMutex());
428            
429     mUnloadQueue.insert(p);
430     p->GetImage()->UpdateInformation();
431     p->GetImage()->PropagateUpdateExtent();
432     long ImMem = p->GetImage()->GetEstimatedMemorySize();
433     mTotalMem += ImMem;
434
435     GimmickMessage(5,"==> Image in memory = "<<mUnloadQueue.size()<<std::endl);
436     GimmickMessage(5,"==> Total mem       = "<<mTotalMem<<" Ko"<<std::endl);
437
438     //  return;
439
440     while (mTotalMem > mTotalMemMax)
441       {
442         GimmickMessage(5,
443                        "   ! Exceeded max of "
444                        << mTotalMemMax << " Ko : unloading oldest image ... "
445                        << std::endl);
446         if ( mUnloadQueue.size() <= 1 ) 
447           {
448              GimmickMessage(5,
449                             "   Only one image : cannot load AND unload it !!"
450                             <<std::endl);
451             break; 
452             
453           }
454         ImageToLoadPtr unload = mUnloadQueue.remove_top();
455         MultiThreadImageReaderUser* user = unload->GetUser();
456
457         /*
458         if ((user!=0)&&(user!=this)) 
459           {
460             user->GetMultiThreadImageReaderUserMutex().Lock();
461           }
462         */
463
464         std::string filename = unload->GetFilename();
465
466         GimmickMessage(5,"'" << filename << "'" << std::endl);
467         mTotalMem -= unload->GetImage()->GetEstimatedMemorySize();
468
469         GimmickMessage(5," ==> Total mem = "<<mTotalMem<<" Ko "<<std::endl);
470
471
472         if (user!=0) 
473           {
474             //      std::cout << "unlock..."<<std::endl;
475             //   user->GetMultiThreadImageReaderUserMutex().Unlock();
476             //      std::cout << "event"<<std::endl;
477             user->MultiThreadImageReaderSendEvent
478               (filename,
479                MultiThreadImageReaderUser::ImageUnloaded,
480                0);
481             //      std::cout << "event ok"<<std::endl;
482
483           }     
484
485         if (unload->Index()>=0)
486           {
487             // GimmickMessage(5,"still in queue"<<std::endl);
488           }
489         unload->Index() = -1;
490
491
492         ImageMapType::iterator it = mImages.find(unload);
493         if (it!=mImages.end())
494           {
495             mImages.erase(it);
496           }
497         //          std::cout << "delete..."<<std::endl;
498         delete unload;
499         //          std::cout << "delete ok."<<std::endl;
500
501       }
502     
503   
504   }
505   //=====================================================================
506
507   //=====================================================================
508   int MultiThreadImageReader::GetMaximalPriority()
509   { 
510     wxMutexLocker lock(GetMultiThreadImageReaderUserMutex()); //mMutex);
511     return GetMaximalPriorityWithoutLocking();
512   }
513   //=====================================================================
514
515
516   //=====================================================================
517   int MultiThreadImageReader::GetMaximalPriorityWithoutLocking()
518   { 
519     long max = 0;
520     if (mQueue.size()>0) 
521       {
522         max = mQueue.top()->GetPriority();
523       }
524     if (mUnloadQueue.size()>0)
525       {
526         int max2 = mUnloadQueue.top()->GetPriority();
527         if (max2>max) max=max2;
528       }
529     return max;
530   }
531   //=====================================================================
532
533
534   //=====================================================================
535   //=====================================================================
536   //=====================================================================
537   //=====================================================================
538
539   //=====================================================================
540   void*  ThreadedImageReader::Entry()
541   {
542     //    std::cout << "### Thread "<<GetCurrentId()<<"::Entry()"
543     //                << std::endl;
544
545     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
546       ("",
547        MultiThreadImageReaderUser::ThreadedReaderStarted,
548        0);
549
550     // While was not deleted 
551     while (!TestDestroy())
552       {
553                 //std::cout << "### Thread "<<GetCurrentId()<<" still alive"  << std::endl;
554           
555         // Lock the mutex
556         mMultiThreadImageReader->MultiThreadImageReaderEventLock();
557         //mMutex.Lock();
558         // If image in queue
559         if (mMultiThreadImageReader->mQueue.size()>0)
560           {
561             MultiThreadImageReader::ImageToLoadPtr i = 
562               mMultiThreadImageReader->mQueue.remove_top();
563
564             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
565             //mMutex.Unlock();
566
567             
568             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
569             //                << i->GetFilename() << "'" << std::endl;
570             
571             // Do the job
572             vtkImageData* im = Read(i->GetFilename());
573
574             // Store it in the map
575             mMultiThreadImageReader->MultiThreadImageReaderEventLock();
576             //mMutex.Lock();
577             MultiThreadImageReader::ImageToLoad itl(0,i->GetFilename());
578             MultiThreadImageReader::ImageMapType::iterator it = 
579               mMultiThreadImageReader->mImages.find(&itl);
580             MultiThreadImageReader::ImageToLoadPtr 
581               pitl = const_cast<MultiThreadImageReader::ImageToLoadPtr>
582               (it->first);
583             pitl->SetImage(im);
584             mMultiThreadImageReader->SignalImageRead(pitl,true);//i->GetFilename());
585             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();           //mMutex.Unlock();
586             
587             //      std::cout << "### Thread "<<GetCurrentId()<<" : reading '"
588             //                << i->GetFilename() << "' : DONE" << std::endl;
589             
590           }
591         else 
592           {
593             mMultiThreadImageReader->MultiThreadImageReaderEventUnlock();
594             //mMutex.Unlock();
595             // Wait a little to avoid blocking 
596             Sleep(10);
597           }
598       };
599     //    std::cout << "### Thread "<<GetCurrentId()<<" stopping"
600     //                << std::endl;
601        
602     return 0;
603   }
604   //=====================================================================
605
606   //=====================================================================
607   void ThreadedImageReader::OnExit()
608   {
609     mMultiThreadImageReader->MultiThreadImageReaderSendEvent
610       ("",
611        MultiThreadImageReaderUser::ThreadedReaderStopped,
612        0);
613   }
614   //=====================================================================
615
616   //=====================================================================
617   vtkImageData* ThreadedImageReader::Read(const std::string& filename)
618   {
619     return mReader.ReadImage(filename);
620   }
621   //=====================================================================
622
623 } // namespace creaImageIO