ML Reference
MeVis/Foundation/Sources/ML/include/host/mlMultiThreadedPageRequestProcessor.h
Go to the documentation of this file.
00001 // **InsertLicense** code
00002 //-------------------------------------------------------------------------
00008 //-------------------------------------------------------------------------
00009 #ifndef __mlMultiThreadedPageRequestProcessor_H
00010 #define __mlMultiThreadedPageRequestProcessor_H
00011 
00012 #include "mlInitSystemML.h"
00013 
00014 #include "mlPageRequestProcessorBase.h"
00015 #include "mlAtomicCounter.h"
00016 #include "mlProcessingTimeLine.h"
00017 
00018 #include <boost/thread.hpp>
00019 #include <mlMutex.h>
00020 #include <mlWaitCondition.h>
00021 
00022 ML_START_NAMESPACE
00023 
00024 //-------------------------------------------------------------------------------------------
00027 //-------------------------------------------------------------------------------------------
00028 class MLEXPORT MultiThreadedPageRequestProcessor : public PageRequestProcessorBase
00029 {
00030 public:
00031   MultiThreadedPageRequestProcessor(Host* host = NULL);
00032   ~MultiThreadedPageRequestProcessor();
00033 
00035   virtual bool supportsMultiThreading() { return true; }
00036 
00038   virtual void setNumWorkerThreads(int threads);
00039 
00041   virtual void addRootTileRequest(TileRequest* tileRequest);
00042 
00044   void processAll();
00045 
00047   void process(double timeBudget);
00048 
00050   bool needsProcessing();
00051 
00053   virtual void append(PageRequest* request);
00054 
00056   virtual void enterProcessingScope();
00058   virtual void leaveProcessingScope();
00059 
00060 private:
00061   struct WorkerThreadState;
00062 
00064   bool processNextGUIRequest(int millisecondsToWait);
00065 
00067   void processNextWorkerRequest(WorkerThreadState* state, PerThreadStorage& perThreadStorage);
00068 
00070   MLErrorCode processRequest(PageRequest* request, bool isGUIThread, PerThreadStorage& perThreadStorage);
00071 
00073   static void rootTileRequestFinishedCB(void* data, TileRequest* request);
00074 
00076   void rootTileRequestFinished(TileRequest* request);
00077 
00079   void runPageRequestProducerLoop();
00080 
00082   void runWorkerThreadLoop(WorkerThreadState* state);
00083   
00085   void notifyAllThreads();
00086 
00088   void launchWorkerThreads();
00089   
00091   int getCurrentNumberOfPageRequestsInQueues();
00092 
00094   void wakeupProducerIfWaitingForQueue();
00095  
00097   virtual void handleErrorsInGUIThread();
00098 
00100   void stopProcessingOnAllThreads();
00101 
00103   virtual void getAllPageRequestQueues(std::vector< std::deque<PageRequest*>* >& queues);
00104   
00106   bool _shutdown;
00107 
00109   bool _producerWorking;
00111   int  _workerThreadsWorking;
00114   int _maxPageRequestsInQueues;
00117   bool _producerWaitingForQueue;
00118 
00120   bool _processingAllowed;
00121 
00123   WaitCondition _stoppedProcessing;
00124 
00126   WaitCondition _guiThreadWakeup;
00127   WaitCondition _producerWakeup;
00128   WaitCondition _workerWakeup;
00129   WaitCondition _ioThreadWakeup;
00130 
00132   Mutex _mutex;
00133 
00135   std::vector<PageRequestCursor*> _newCursors;
00136 
00138   std::deque<PageRequest*> _parallelWorkQueue;
00139   std::deque<PageRequest*> _ioWorkQueue;
00140 
00142   struct WorkerThreadState
00143   {
00144     WorkerThreadState();
00145     ~WorkerThreadState();
00146 
00148     boost::thread* _thread;
00149 
00151     bool _working;
00153     int  _workerThreadId;
00155     ProcessingTimeLine::TimeLine* _timeLine;
00156 
00158     WaitCondition* _wakeup;
00160     std::deque<PageRequest*>* _queue;
00161 
00163     bool _shutdown;
00164 
00166     TimeCounter _profilingTimer;
00167   };
00168 
00170   int _numDesiredWorkerThreads;
00171 
00173   std::vector<WorkerThreadState*> _workerThreads;
00174 
00176   WorkerThreadState* _ioThread;
00177 
00179   boost::thread* _producerThread;
00180 
00182   ProcessingTimeLine::TimeLine* _producerThreadTimeLine;
00183 
00185   struct ProfilingInfo {
00186     ProfilingInfo() {
00187       count = 0;
00188       elapsedTime = 0;
00189     }
00190 
00192     int count;
00193 
00195     double elapsedTime;
00196   };
00197 
00199   std::map<PagedImage*, ProfilingInfo> _profilingData;
00200 
00202   void addThreadTimesToMLProfiling();
00203 
00204 };
00205 
00206 ML_END_NAMESPACE
00207 
00208 #endif
00209