Line data Source code
1 : #include "Compute/SpWorker.hpp" 2 : #include "Compute/SpComputeEngine.hpp" 3 : #include "TaskGraph/SpAbstractTaskGraph.hpp" 4 : 5 : std::atomic<long int> SpWorker::totalNbThreadsCreated = 1; 6 : 7 : thread_local SpWorker* workerForThread = nullptr; 8 : 9 157 : void SpWorker::start() { 10 157 : if(!t.joinable()) { 11 304 : t = std::thread([&]() { 12 152 : SpUtils::SetThreadId(threadId); 13 152 : SpWorker::setWorkerForThread(this); 14 : 15 152 : doLoop(nullptr); 16 152 : }); 17 : } 18 157 : } 19 : 20 437 : void SpWorker::waitOnCe(SpComputeEngine* inCe, SpAbstractTaskGraph* atg) { 21 437 : inCe->wait(*this, atg); 22 437 : } 23 : 24 152 : void SpWorker::setWorkerForThread(SpWorker *w) { 25 152 : workerForThread = w; 26 152 : } 27 : 28 0 : SpWorker* SpWorker::getWorkerForThread() { 29 0 : return workerForThread; 30 : } 31 : 32 1670 : void SpWorker::doLoop(SpAbstractTaskGraph* inAtg) { 33 1670 : while(!stopFlag.load(std::memory_order_relaxed) && (!inAtg || !inAtg->isFinished())) { 34 1518 : SpComputeEngine* saveCe = nullptr; 35 : 36 : // Using memory order acquire on ce.load to form release/acquire pair 37 : // I think we could use memory order consume as all the code that follows depends on the load of ce (through saveCe). 38 1518 : if((saveCe = ce.load(std::memory_order_acquire))) { 39 1514 : if(saveCe->areThereAnyWorkersToMigrate()) { 40 4 : if(saveCe->areWorkersToMigrateOfType(wt)) { 41 4 : auto previousNbOfWorkersToMigrate = saveCe->fetchDecNbOfWorkersToMigrate(); 42 : 43 4 : if(previousNbOfWorkersToMigrate > 0) { 44 : 45 4 : SpComputeEngine* newCe = saveCe->getCeToMigrateTo(); 46 4 : ce.store(newCe, std::memory_order_relaxed); 47 : 48 4 : auto previousMigrationSignalingCounterVal = saveCe->fetchDecMigrationSignalingCounter(); 49 : 50 4 : if(previousMigrationSignalingCounterVal == 1) { 51 2 : saveCe->notifyMigrationFinished(); 52 : } 53 : 54 4 : continue; 55 : } 56 : } 57 : } 58 : 59 1510 : if(saveCe->areThereAnyReadyTasks()){ 60 1073 : SpAbstractTask* task = saveCe->getTask(); 61 : 62 1073 : if(task) { 63 1073 : SpAbstractTaskGraph* atg = task->getAbstractTaskGraph(); 64 : 65 1073 : atg->preTaskExecution(task); 66 : 67 1073 : execute(task); 68 : 69 1073 : atg->postTaskExecution(task); 70 : 71 1073 : continue; 72 : } 73 : } 74 : 75 437 : waitOnCe(saveCe, inAtg); 76 : } else { 77 4 : idleWait(); 78 : } 79 : 80 : } 81 152 : } 82 : 83 : 84 :