217 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			217 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| 
 | |
| #include "./coroutine.hpp"
 | |
| 
 | |
| #include <array>
 | |
| #include <cstdio>
 | |
| #include "../util/os.hpp"
 | |
| 
 | |
| namespace mijin
 | |
| {
 | |
| 
 | |
| //
 | |
| // internal defines
 | |
| //
 | |
| 
 | |
| //
 | |
| // internal constants
 | |
| //
 | |
| 
 | |
| //
 | |
| // internal types
 | |
| //
 | |
| 
 | |
| //
 | |
| // internal variables
 | |
| //
 | |
| 
 | |
| namespace impl
 | |
| {
 | |
| thread_local TaskLoop::StoredTask* gCurrentTask = nullptr;
 | |
| }
 | |
| 
 | |
| //
 | |
| // internal functions
 | |
| //
 | |
| 
 | |
| void MultiThreadedTaskLoop::managerThread(std::stop_token stopToken) // NOLINT(performance-unnecessary-value-param)
 | |
| {
 | |
|     setCurrentThreadName("Task Manager");
 | |
| 
 | |
|     while (!stopToken.stop_requested())
 | |
|     {
 | |
|         // first clear out any parked tasks that are actually finished
 | |
|         auto it = std::remove_if(parkedTasks_.begin(), parkedTasks_.end(), [](StoredTask& task) {
 | |
|             return !task.task || task.task->status() == TaskStatus::FINISHED;
 | |
|         });
 | |
|         parkedTasks_.erase(it, parkedTasks_.end());
 | |
| 
 | |
|         // then try to push any task from the buffer into the queue, if possible
 | |
|         for (auto it = parkedTasks_.begin(); it != parkedTasks_.end();)
 | |
|         {
 | |
|             if (!it->task->canResume())
 | |
|             {
 | |
|                 ++it;
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             if (readyTasks_.tryPushMaybeMove(*it)) {
 | |
|                 it = parkedTasks_.erase(it);
 | |
|             }
 | |
|             else {
 | |
|                 break;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         // then clear the incoming task queue
 | |
|         while (true)
 | |
|         {
 | |
|             std::optional<StoredTask> task = queuedTasks_.tryPop();
 | |
|             if (!task.has_value()) {
 | |
|                 break;
 | |
|             }
 | |
|             
 | |
|             // try to directly move it into the next queue
 | |
|             if (readyTasks_.tryPushMaybeMove(*task)) {
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             // otherwise park it
 | |
|             parkedTasks_.push_back(std::move(*task));
 | |
|         }
 | |
| 
 | |
|         // next collect tasks returning from the worker threads
 | |
|         while (true)
 | |
|         {
 | |
|             std::optional<StoredTask> task = returningTasks_.tryPop();
 | |
|             if (!task.has_value()) {
 | |
|                 break;
 | |
|             }
 | |
| 
 | |
|             if (task->task == nullptr || task->task->status() == TaskStatus::FINISHED) {
 | |
|                 continue; // task has been transferred or finished
 | |
|             }
 | |
| 
 | |
|             if (task->task->canResume() && readyTasks_.tryPushMaybeMove(*task)) {
 | |
|                 continue; // instantly resume, no questions asked
 | |
|             }
 | |
| 
 | |
|             // otherwise park it for future processing
 | |
|             parkedTasks_.push_back(std::move(*task));
 | |
|         }
 | |
| 
 | |
|         std::this_thread::sleep_for(std::chrono::milliseconds(1));
 | |
|     }
 | |
| }
 | |
| 
 | |
| void MultiThreadedTaskLoop::workerThread(std::stop_token stopToken, std::size_t workerId) // NOLINT(performance-unnecessary-value-param)
 | |
| {
 | |
|     currentLoopStorage() = this; // forever (on this thread)
 | |
|     
 | |
|     std::array<char, 16> threadName;
 | |
|     (void) std::snprintf(threadName.data(), 16, "Task Worker %lu", static_cast<unsigned long>(workerId));
 | |
|     setCurrentThreadName(threadName.data());
 | |
| 
 | |
|     while (!stopToken.stop_requested())
 | |
|     {
 | |
|         // try to fetch a task to run
 | |
|         std::optional<StoredTask> task = readyTasks_.tryPop();
 | |
|         if (!task.has_value())
 | |
|         {
 | |
|             std::this_thread::sleep_for(std::chrono::milliseconds(1));
 | |
|             continue;
 | |
|         }
 | |
| 
 | |
|         // run it
 | |
|         impl::gCurrentTask = &*task;
 | |
|         tickTask(*task);
 | |
|         impl::gCurrentTask = nullptr;
 | |
| 
 | |
|         // and give it back
 | |
|         returningTasks_.push(std::move(*task));
 | |
|     }
 | |
| }
 | |
| 
 | |
| //
 | |
| // public functions
 | |
| //
 | |
| 
 | |
| void SimpleTaskLoop::transferCurrentTask(TaskLoop& otherLoop) noexcept
 | |
| {
 | |
|     assertCorrectThread();
 | |
| 
 | |
|     if (&otherLoop == this) {
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     MIJIN_ASSERT_FATAL(currentTask_ != tasks_.end(), "Trying to call transferCurrentTask() while not running a task!");
 | |
| 
 | |
|     // now start the transfer, first disown the task
 | |
|     StoredTask storedTask = std::move(*currentTask_);
 | |
|     currentTask_->task = nullptr; // just to be sure
 | |
| 
 | |
|     // then send it over to the other loop
 | |
|     otherLoop.addStoredTask(std::move(storedTask));
 | |
| }
 | |
| 
 | |
| void SimpleTaskLoop::addStoredTask(StoredTask&& storedTask) noexcept
 | |
| {
 | |
|     storedTask.task->setLoop(this);
 | |
|     if (threadId_ == std::thread::id() || threadId_ == std::this_thread::get_id())
 | |
|     {
 | |
|         // same thread, just copy it over
 | |
|         if (currentLoopStorage() != nullptr) {
 | |
|             // currently running, can't append to tasks_ directly
 | |
|             newTasks_.push_back(std::move(storedTask));
 | |
|         }
 | |
|         else {
 | |
|             tasks_.push_back(std::move(storedTask));
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         // other thread, better be safe
 | |
|         queuedTasks_.push(std::move(storedTask));
 | |
|     }
 | |
| }
 | |
| 
 | |
| void MultiThreadedTaskLoop::transferCurrentTask(TaskLoop& otherLoop) noexcept
 | |
| {
 | |
|     if (&otherLoop == this) {
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     MIJIN_ASSERT_FATAL(currentTask_ != nullptr, "Trying to call transferCurrentTask() while not running a task!");
 | |
| 
 | |
|     // now start the transfer, first disown the task
 | |
|     StoredTask storedTask = std::move(*impl::gCurrentTask);
 | |
|     impl::gCurrentTask->task = nullptr; // just to be sure
 | |
| 
 | |
|     // then send it over to the other loop
 | |
|     otherLoop.addStoredTask(std::move(storedTask));
 | |
| }
 | |
| 
 | |
| void MultiThreadedTaskLoop::addStoredTask(StoredTask&& storedTask) noexcept
 | |
| {
 | |
|     storedTask.task->setLoop(this);
 | |
|     
 | |
|     // just assume we are not on the manager thread, as that wouldn't make sense
 | |
|     queuedTasks_.push(std::move(storedTask));
 | |
| }
 | |
| 
 | |
| void MultiThreadedTaskLoop::start(std::size_t numWorkerThreads)
 | |
| {
 | |
|     managerThread_ = std::jthread([this](std::stop_token stopToken) { managerThread(std::move(stopToken)); });
 | |
|     workerThreads_.reserve(numWorkerThreads);
 | |
|     for (std::size_t workerId = 0; workerId < numWorkerThreads; ++workerId) {
 | |
|         workerThreads_.emplace_back([this, workerId](std::stop_token stopToken) { workerThread(std::move(stopToken), workerId); });
 | |
|     }
 | |
| }
 | |
| 
 | |
| void MultiThreadedTaskLoop::stop()
 | |
| {
 | |
|     workerThreads_.clear(); // will also set the stop token
 | |
|     managerThread_ = {}; // this too
 | |
| }
 | |
| 
 | |
| } // namespace mijin
 |