ML Reference
|
00001 // **InsertLicense** code 00002 //------------------------------------------------------------------------- 00008 //------------------------------------------------------------------------- 00009 #ifndef __mlMultiThreadedPageRequestProcessor_H 00010 #define __mlMultiThreadedPageRequestProcessor_H 00011 00012 #include "mlInitSystemML.h" 00013 00014 #include "mlPageRequestProcessorBase.h" 00015 #include "mlAtomicCounter.h" 00016 #include "mlProcessingTimeLine.h" 00017 00018 #include <boost/thread.hpp> 00019 #include <mlMutex.h> 00020 #include <mlWaitCondition.h> 00021 00022 ML_START_NAMESPACE 00023 00024 //------------------------------------------------------------------------------------------- 00027 //------------------------------------------------------------------------------------------- 00028 class MLEXPORT MultiThreadedPageRequestProcessor : public PageRequestProcessorBase 00029 { 00030 public: 00031 MultiThreadedPageRequestProcessor(Host* host = NULL); 00032 ~MultiThreadedPageRequestProcessor(); 00033 00035 virtual bool supportsMultiThreading() { return true; } 00036 00038 virtual void setNumWorkerThreads(int threads); 00039 00041 virtual void addRootTileRequest(TileRequest* tileRequest); 00042 00044 void processAll(); 00045 00047 void process(double timeBudget); 00048 00050 bool needsProcessing(); 00051 00053 virtual void append(PageRequest* request); 00054 00056 virtual void enterProcessingScope(); 00058 virtual void leaveProcessingScope(); 00059 00060 private: 00061 struct WorkerThreadState; 00062 00064 bool processNextGUIRequest(int millisecondsToWait); 00065 00067 void processNextWorkerRequest(WorkerThreadState* state, PerThreadStorage& perThreadStorage); 00068 00070 MLErrorCode processRequest(PageRequest* request, bool isGUIThread, PerThreadStorage& perThreadStorage); 00071 00073 static void rootTileRequestFinishedCB(void* data, TileRequest* request); 00074 00076 void rootTileRequestFinished(TileRequest* request); 00077 00079 void runPageRequestProducerLoop(); 00080 00082 void runWorkerThreadLoop(WorkerThreadState* state); 00083 00085 void notifyAllThreads(); 00086 00088 void launchWorkerThreads(); 00089 00091 int getCurrentNumberOfPageRequestsInQueues(); 00092 00094 void wakeupProducerIfWaitingForQueue(); 00095 00097 virtual void handleErrorsInGUIThread(); 00098 00100 void stopProcessingOnAllThreads(); 00101 00103 virtual void getAllPageRequestQueues(std::vector< std::deque<PageRequest*>* >& queues); 00104 00106 bool _shutdown; 00107 00109 bool _producerWorking; 00111 int _workerThreadsWorking; 00114 int _maxPageRequestsInQueues; 00117 bool _producerWaitingForQueue; 00118 00120 bool _processingAllowed; 00121 00123 WaitCondition _stoppedProcessing; 00124 00126 WaitCondition _guiThreadWakeup; 00127 WaitCondition _producerWakeup; 00128 WaitCondition _workerWakeup; 00129 WaitCondition _ioThreadWakeup; 00130 00132 Mutex _mutex; 00133 00135 std::vector<PageRequestCursor*> _newCursors; 00136 00138 std::deque<PageRequest*> _parallelWorkQueue; 00139 std::deque<PageRequest*> _ioWorkQueue; 00140 00142 struct WorkerThreadState 00143 { 00144 WorkerThreadState(); 00145 ~WorkerThreadState(); 00146 00148 boost::thread* _thread; 00149 00151 bool _working; 00153 int _workerThreadId; 00155 ProcessingTimeLine::TimeLine* _timeLine; 00156 00158 WaitCondition* _wakeup; 00160 std::deque<PageRequest*>* _queue; 00161 00163 bool _shutdown; 00164 00166 TimeCounter _profilingTimer; 00167 }; 00168 00170 int _numDesiredWorkerThreads; 00171 00173 std::vector<WorkerThreadState*> _workerThreads; 00174 00176 WorkerThreadState* _ioThread; 00177 00179 boost::thread* _producerThread; 00180 00182 ProcessingTimeLine::TimeLine* _producerThreadTimeLine; 00183 00185 struct ProfilingInfo { 00186 ProfilingInfo() { 00187 count = 0; 00188 elapsedTime = 0; 00189 } 00190 00192 int count; 00193 00195 double elapsedTime; 00196 }; 00197 00199 std::map<PagedImage*, ProfilingInfo> _profilingData; 00200 00202 void addThreadTimesToMLProfiling(); 00203 00204 }; 00205 00206 ML_END_NAMESPACE 00207 00208 #endif 00209