Line data Source code
1 : /////////////////////////////////////////////////////////////////////////// 2 : // Spetabaru - Berenger Bramas MPCDF - 2017 3 : // Under LGPL Licence, please you must read the LICENCE file. 4 : /////////////////////////////////////////////////////////////////////////// 5 : #ifndef SPTASKMANAGER_HPP 6 : #define SPTASKMANAGER_HPP 7 : 8 : #include <functional> 9 : #include <list> 10 : #include <unordered_map> 11 : #include <memory> 12 : #include <unistd.h> 13 : #include <fstream> 14 : #include <cmath> 15 : #include <atomic> 16 : 17 : #include "Utils/SpUtils.hpp" 18 : #include "Task/SpAbstractTask.hpp" 19 : #include "Task/SpPriority.hpp" 20 : #include "Utils/SpTimePoint.hpp" 21 : #include "SpSimpleScheduler.hpp" 22 : #include "SpPrioScheduler.hpp" 23 : #include "Utils/small_vector.hpp" 24 : #include "Compute/SpComputeEngine.hpp" 25 : #include "SpTaskManagerListener.hpp" 26 : 27 : //! The runtime is the main component of spetabaru. 28 : class SpTaskManager{ 29 : 30 : std::atomic<SpComputeEngine*> ce; 31 : 32 : //! To protect tasksFinished 33 : std::mutex mutexFinishedTasks; 34 : 35 : //! List of tasks finished 36 : std::list<SpAbstractTask*> tasksFinished; 37 : 38 : //! To wait all tasks to be over 39 : std::condition_variable conditionAllTasksOver; 40 : 41 : //! Number of currently running tasks 42 : std::atomic<int> nbRunningTasks; 43 : 44 : //! Number of added tasks 45 : std::atomic<int> nbPushedTasks; 46 : 47 : //! Number of tasks that are ready 48 : std::atomic<int> nbReadyTasks; 49 : 50 : //! Number of finished tasks 51 : std::atomic<int> nbFinishedTasks; 52 : 53 : //! To protect commutative data accesses 54 : std::mutex mutexCommutative; 55 : 56 : small_vector<SpAbstractTask*> readyTasks; 57 : 58 : template <const bool isNotCalledInAContextOfTaskCreation> 59 2398 : void insertIfReady(SpAbstractTask* aTask){ 60 2398 : if(aTask->isState(SpTaskState::WAITING_TO_BE_READY)){ 61 2371 : aTask->takeControl(); 62 2371 : if(aTask->isState(SpTaskState::WAITING_TO_BE_READY)){ 63 2319 : SpDebugPrint() << "Is waiting to be ready " << aTask->getId(); 64 2319 : const bool hasCommutativeAccessMode = aTask->hasMode(SpDataAccessMode::COMMUTATIVE_WRITE); 65 2319 : if(hasCommutativeAccessMode){ 66 62 : mutexCommutative.lock(); 67 : } 68 2319 : if(aTask->dependencesAreReady()){ 69 1073 : aTask->useDependences(); 70 1072 : if(hasCommutativeAccessMode){ 71 14 : mutexCommutative.unlock(); 72 : } 73 1072 : SpDebugPrint() << "Was not in ready list " << aTask->getId(); 74 : 75 1073 : nbReadyTasks++; 76 : 77 1073 : aTask->setState(SpTaskState::READY); 78 1073 : aTask->releaseControl(); 79 : 80 1073 : auto l = listener.load(); 81 : 82 1073 : if(l) { 83 1070 : l->thisTaskIsReady(aTask, isNotCalledInAContextOfTaskCreation); 84 : } 85 : 86 1073 : if(!ce) { 87 4 : readyTasks.push_back(aTask); 88 : } else { 89 1069 : ce.load()->pushTask(aTask); 90 : } 91 : } 92 : else{ 93 1246 : SpDebugPrint() << " not ready yet " << aTask->getId(); 94 1246 : aTask->releaseControl(); 95 1246 : if(hasCommutativeAccessMode){ 96 48 : mutexCommutative.unlock(); 97 : } 98 : } 99 : } 100 : else{ 101 52 : aTask->releaseControl(); 102 : } 103 : } 104 2398 : } 105 : 106 : /////////////////////////////////////////////////////////////////////////////////////// 107 : std::atomic<SpTaskManagerListener*> listener; 108 : 109 : public: 110 : 111 51 : void setListener(SpTaskManagerListener* inListener){ 112 51 : listener = inListener; 113 51 : } 114 : 115 : /////////////////////////////////////////////////////////////////////////////////////// 116 : 117 52 : explicit SpTaskManager() : ce(nullptr), nbRunningTasks(0), nbPushedTasks(0), nbReadyTasks(0), nbFinishedTasks(0), listener(nullptr) {} 118 : 119 : // No copy or move 120 : SpTaskManager(const SpTaskManager&) = delete; 121 : SpTaskManager(SpTaskManager&&) = delete; 122 : SpTaskManager& operator=(const SpTaskManager&) = delete; 123 : SpTaskManager& operator=(SpTaskManager&&) = delete; 124 : 125 52 : ~SpTaskManager(){ 126 52 : waitAllTasks(); 127 : 128 : // Delete tasks 129 1125 : for(auto ptrTask : tasksFinished){ 130 1073 : delete ptrTask; 131 : } 132 52 : } 133 : 134 35 : const std::list<SpAbstractTask*>& getFinishedTaskList() const{ 135 35 : return tasksFinished; 136 : } 137 : 138 : int getNbRunningTasks() const{ 139 : return nbRunningTasks; 140 : } 141 : 142 : //! Wait all tasks to be finished 143 215 : void waitAllTasks(){ 144 : { 145 430 : std::unique_lock<std::mutex> locker(mutexFinishedTasks); 146 215 : SpDebugPrint() << "Waiting for " << tasksFinished.size() << " to finish over " << nbPushedTasks << " created tasks"; 147 1160 : conditionAllTasksOver.wait(locker, [&](){return static_cast<long int>(tasksFinished.size()) == nbPushedTasks;}); 148 : } 149 : 150 215 : assert(nbRunningTasks == 0); 151 215 : } 152 : 153 : //! Wait until windowSize or less tasks are pending 154 167 : void waitRemain(const long int windowSize){ 155 334 : std::unique_lock<std::mutex> locker(mutexFinishedTasks); 156 167 : SpDebugPrint() << "Waiting for " << tasksFinished.size() << " to finish over " << nbPushedTasks << " created tasks"; 157 344 : conditionAllTasksOver.wait(locker, [&](){return nbPushedTasks - static_cast<long int>(tasksFinished.size()) <= windowSize;}); 158 167 : } 159 : 160 : 161 1073 : void addNewTask(SpAbstractTask* newTask){ 162 1073 : nbPushedTasks++; 163 1073 : insertIfReady<false>(newTask); 164 1073 : } 165 : 166 0 : int getNbReadyTasks() const{ 167 0 : return nbReadyTasks; 168 : } 169 : 170 52 : void setComputeEngine(SpComputeEngine* inCe) { 171 52 : if(inCe && !ce) { 172 52 : ce = inCe; 173 52 : ce.load()->pushTasks(readyTasks); 174 52 : readyTasks.clear(); 175 : } 176 52 : } 177 : 178 1073 : void preTaskExecution(SpAbstractTask* t) { 179 1073 : nbReadyTasks--; 180 1073 : nbRunningTasks += 1; 181 1073 : t->takeControl(); 182 : 183 1073 : SpDebugPrint() << "Execute task with ID " << t->getId(); 184 1073 : assert(t->isState(SpTaskState::READY)); 185 : 186 1073 : t->setState(SpTaskState::RUNNING); 187 1073 : } 188 : 189 1073 : void postTaskExecution(SpAbstractTask* t){ 190 1073 : t->setState(SpTaskState::POST_RUN); 191 : 192 2146 : small_vector<SpAbstractTask*> candidates; 193 1073 : t->releaseDependences(&candidates); 194 : 195 1073 : SpDebugPrint() << "Proceed candidates from after " << t->getId() << ", they are " << candidates.size(); 196 2398 : for(auto otherId : candidates){ 197 1325 : SpDebugPrint() << "Test " << otherId->getId(); 198 1325 : insertIfReady<true>(otherId); 199 : } 200 : 201 1073 : t->setState(SpTaskState::FINISHED); 202 1073 : t->releaseControl(); 203 : 204 1073 : nbRunningTasks--; 205 : 206 : // We save all of the following values because the SpTaskManager 207 : // instance might get destroyed as soon as the mutex (mutexFinishedTasks) 208 : // protected region below has been executed. 209 1073 : auto previousCntVal = nbFinishedTasks.fetch_add(1); 210 1073 : auto nbPushedTasksVal = nbPushedTasks.load(); 211 1073 : SpComputeEngine *saveCe = ce.load(); 212 : 213 : { 214 : // In this case the lock on mutexFinishedTasks should be held 215 : // while doing the notify on conditionAllTasksOver 216 : // (conditionAllTasksOver.notify_one()) because we don't want 217 : // the condition variable to get destroyed before we were able 218 : // to notify. 219 2146 : std::unique_lock<std::mutex> locker(mutexFinishedTasks); 220 1073 : tasksFinished.emplace_back(t); 221 : 222 : // We notify conditionAllTasksOver every time because of 223 : // waitRemain 224 1073 : conditionAllTasksOver.notify_one(); 225 : } 226 : 227 1073 : if(nbPushedTasksVal == (previousCntVal + 1)) { 228 98 : saveCe->wakeUpWaitingWorkers(); 229 : } 230 1073 : } 231 : 232 14 : const SpComputeEngine* getComputeEngine() const { 233 14 : return ce; 234 : } 235 : 236 0 : bool isFinished() const { 237 0 : return nbFinishedTasks == nbPushedTasks; 238 : } 239 : }; 240 : 241 : 242 : #endif