LCOV - code coverage report
Current view: top level - Src/Scheduler - SpTaskManager.hpp (source / functions) Hit Total Coverage
Test: Coverage example Lines: 90 94 95.7 %
Date: 2021-12-02 17:21:05 Functions: 15 17 88.2 %

          Line data    Source code
       1             : ///////////////////////////////////////////////////////////////////////////
       2             : // Spetabaru - Berenger Bramas MPCDF - 2017
       3             : // Under LGPL Licence, please you must read the LICENCE file.
       4             : ///////////////////////////////////////////////////////////////////////////
       5             : #ifndef SPTASKMANAGER_HPP
       6             : #define SPTASKMANAGER_HPP
       7             : 
       8             : #include <functional>
       9             : #include <list>
      10             : #include <unordered_map>
      11             : #include <memory>
      12             : #include <unistd.h>
      13             : #include <fstream>
      14             : #include <cmath>
      15             : #include <atomic>
      16             : 
      17             : #include "Utils/SpUtils.hpp"
      18             : #include "Task/SpAbstractTask.hpp"
      19             : #include "Task/SpPriority.hpp"
      20             : #include "Utils/SpTimePoint.hpp"
      21             : #include "SpSimpleScheduler.hpp"
      22             : #include "SpPrioScheduler.hpp"
      23             : #include "Utils/small_vector.hpp"
      24             : #include "Compute/SpComputeEngine.hpp"
      25             : #include "SpTaskManagerListener.hpp"
      26             : 
      27             : //! The runtime is the main component of spetabaru.
      28             : class SpTaskManager{
      29             : 
      30             :     std::atomic<SpComputeEngine*> ce;
      31             : 
      32             :     //! To protect tasksFinished
      33             :     std::mutex mutexFinishedTasks;
      34             :     
      35             :     //! List of tasks finished
      36             :     std::list<SpAbstractTask*> tasksFinished;
      37             :     
      38             :     //! To wait all tasks to be over
      39             :     std::condition_variable conditionAllTasksOver;
      40             : 
      41             :     //! Number of currently running tasks
      42             :     std::atomic<int> nbRunningTasks;
      43             :     
      44             :     //! Number of added tasks
      45             :     std::atomic<int> nbPushedTasks;
      46             :     
      47             :     //! Number of tasks that are ready
      48             :     std::atomic<int> nbReadyTasks;
      49             :     
      50             :     //! Number of finished tasks
      51             :     std::atomic<int> nbFinishedTasks;
      52             : 
      53             :     //! To protect commutative data accesses
      54             :     std::mutex mutexCommutative;
      55             :     
      56             :     small_vector<SpAbstractTask*> readyTasks;
      57             : 
      58             :     template <const bool isNotCalledInAContextOfTaskCreation>
      59        2398 :     void insertIfReady(SpAbstractTask* aTask){
      60        2398 :         if(aTask->isState(SpTaskState::WAITING_TO_BE_READY)){
      61        2371 :             aTask->takeControl();
      62        2371 :             if(aTask->isState(SpTaskState::WAITING_TO_BE_READY)){
      63        2319 :                 SpDebugPrint() << "Is waiting to be ready " << aTask->getId();
      64        2319 :                 const bool hasCommutativeAccessMode = aTask->hasMode(SpDataAccessMode::COMMUTATIVE_WRITE);
      65        2319 :                 if(hasCommutativeAccessMode){
      66          62 :                     mutexCommutative.lock();
      67             :                 }
      68        2319 :                 if(aTask->dependencesAreReady()){
      69        1073 :                     aTask->useDependences();
      70        1072 :                     if(hasCommutativeAccessMode){
      71          14 :                         mutexCommutative.unlock();
      72             :                     }
      73        1072 :                     SpDebugPrint() << "Was not in ready list " << aTask->getId();
      74             : 
      75        1073 :                     nbReadyTasks++;
      76             : 
      77        1073 :                     aTask->setState(SpTaskState::READY);
      78        1073 :                     aTask->releaseControl();
      79             :                     
      80        1073 :                     auto l = listener.load();
      81             :                     
      82        1073 :                     if(l) {
      83        1070 :                         l->thisTaskIsReady(aTask, isNotCalledInAContextOfTaskCreation);
      84             :                     }
      85             :                     
      86        1073 :                     if(!ce) {
      87           4 :                         readyTasks.push_back(aTask);
      88             :                     } else {
      89        1069 :                         ce.load()->pushTask(aTask);
      90             :                     }
      91             :                 }
      92             :                 else{
      93        1246 :                     SpDebugPrint() << " not ready yet " << aTask->getId();
      94        1246 :                     aTask->releaseControl();
      95        1246 :                     if(hasCommutativeAccessMode){
      96          48 :                        mutexCommutative.unlock();
      97             :                     }
      98             :                 }
      99             :             }
     100             :             else{
     101          52 :                 aTask->releaseControl();
     102             :             }
     103             :         }
     104        2398 :     }
     105             :     
     106             :     ///////////////////////////////////////////////////////////////////////////////////////
     107             :     std::atomic<SpTaskManagerListener*> listener;
     108             :     
     109             : public:
     110             :     
     111          51 :     void setListener(SpTaskManagerListener* inListener){
     112          51 :         listener = inListener;
     113          51 :     }
     114             : 
     115             :     ///////////////////////////////////////////////////////////////////////////////////////
     116             : 
     117          52 :     explicit SpTaskManager() : ce(nullptr), nbRunningTasks(0), nbPushedTasks(0), nbReadyTasks(0), nbFinishedTasks(0), listener(nullptr) {}
     118             : 
     119             :     // No copy or move
     120             :     SpTaskManager(const SpTaskManager&) = delete;
     121             :     SpTaskManager(SpTaskManager&&) = delete;
     122             :     SpTaskManager& operator=(const SpTaskManager&) = delete;
     123             :     SpTaskManager& operator=(SpTaskManager&&) = delete;
     124             : 
     125          52 :     ~SpTaskManager(){
     126          52 :         waitAllTasks();
     127             :         
     128             :         // Delete tasks
     129        1125 :         for(auto ptrTask : tasksFinished){
     130        1073 :             delete ptrTask;
     131             :         }
     132          52 :     }
     133             : 
     134          35 :     const std::list<SpAbstractTask*>& getFinishedTaskList() const{
     135          35 :         return tasksFinished;
     136             :     }
     137             : 
     138             :     int getNbRunningTasks() const{
     139             :         return nbRunningTasks;
     140             :     }
     141             : 
     142             :     //! Wait all tasks to be finished
     143         215 :     void waitAllTasks(){
     144             :         {
     145         430 :             std::unique_lock<std::mutex> locker(mutexFinishedTasks);
     146         215 :             SpDebugPrint() << "Waiting for  " << tasksFinished.size() << " to finish over " << nbPushedTasks << " created tasks";
     147        1160 :             conditionAllTasksOver.wait(locker, [&](){return static_cast<long int>(tasksFinished.size()) == nbPushedTasks;});
     148             :         }
     149             :         
     150         215 :         assert(nbRunningTasks == 0);
     151         215 :     }
     152             : 
     153             :     //! Wait until windowSize or less tasks are pending
     154         167 :     void waitRemain(const long int windowSize){
     155         334 :         std::unique_lock<std::mutex> locker(mutexFinishedTasks);
     156         167 :         SpDebugPrint() << "Waiting for  " << tasksFinished.size() << " to finish over " << nbPushedTasks << " created tasks";
     157         344 :         conditionAllTasksOver.wait(locker, [&](){return nbPushedTasks - static_cast<long int>(tasksFinished.size()) <= windowSize;});
     158         167 :     }
     159             : 
     160             : 
     161        1073 :     void addNewTask(SpAbstractTask* newTask){
     162        1073 :         nbPushedTasks++;
     163        1073 :         insertIfReady<false>(newTask);
     164        1073 :     }
     165             : 
     166           0 :     int getNbReadyTasks() const{
     167           0 :         return nbReadyTasks;
     168             :     }
     169             :     
     170          52 :     void setComputeEngine(SpComputeEngine* inCe) {
     171          52 :         if(inCe && !ce) {
     172          52 :             ce = inCe;
     173          52 :             ce.load()->pushTasks(readyTasks);
     174          52 :             readyTasks.clear();
     175             :         }
     176          52 :     }
     177             :     
     178        1073 :     void preTaskExecution(SpAbstractTask* t) {
     179        1073 :         nbReadyTasks--;
     180        1073 :         nbRunningTasks += 1;
     181        1073 :         t->takeControl();
     182             : 
     183        1073 :         SpDebugPrint() << "Execute task with ID " << t->getId();
     184        1073 :         assert(t->isState(SpTaskState::READY));
     185             : 
     186        1073 :         t->setState(SpTaskState::RUNNING);
     187        1073 :     }
     188             : 
     189        1073 :     void postTaskExecution(SpAbstractTask* t){
     190        1073 :         t->setState(SpTaskState::POST_RUN);
     191             :         
     192        2146 :         small_vector<SpAbstractTask*> candidates;
     193        1073 :         t->releaseDependences(&candidates);
     194             : 
     195        1073 :         SpDebugPrint() << "Proceed candidates from after " << t->getId() << ", they are " << candidates.size();
     196        2398 :         for(auto otherId : candidates){
     197        1325 :             SpDebugPrint() << "Test " << otherId->getId();
     198        1325 :             insertIfReady<true>(otherId);
     199             :         }
     200             :         
     201        1073 :         t->setState(SpTaskState::FINISHED);
     202        1073 :         t->releaseControl();
     203             :         
     204        1073 :         nbRunningTasks--;
     205             :         
     206             :         // We save all of the following values because the SpTaskManager
     207             :         // instance might get destroyed as soon as the mutex (mutexFinishedTasks)
     208             :         // protected region below has been executed.
     209        1073 :         auto previousCntVal = nbFinishedTasks.fetch_add(1);
     210        1073 :         auto nbPushedTasksVal = nbPushedTasks.load();
     211        1073 :         SpComputeEngine *saveCe = ce.load();
     212             :             
     213             :         {
     214             :             // In this case the lock on mutexFinishedTasks should be held
     215             :             // while doing the notify on conditionAllTasksOver
     216             :             // (conditionAllTasksOver.notify_one()) because we don't want
     217             :             // the condition variable to get destroyed before we were able
     218             :             // to notify.
     219        2146 :             std::unique_lock<std::mutex> locker(mutexFinishedTasks);
     220        1073 :             tasksFinished.emplace_back(t);
     221             :             
     222             :             // We notify conditionAllTasksOver every time because of
     223             :             // waitRemain  
     224        1073 :             conditionAllTasksOver.notify_one();
     225             :         }
     226             :         
     227        1073 :         if(nbPushedTasksVal == (previousCntVal + 1)) {
     228          98 :             saveCe->wakeUpWaitingWorkers();
     229             :         }
     230        1073 :     }
     231             :     
     232          14 :     const SpComputeEngine* getComputeEngine() const {
     233          14 :         return ce;
     234             :     }
     235             :     
     236           0 :     bool isFinished() const {
     237           0 :         return nbFinishedTasks == nbPushedTasks;
     238             :     }
     239             : };
     240             : 
     241             : 
     242             : #endif

Generated by: LCOV version 1.14