231 lines
6.2 KiB
C++
231 lines
6.2 KiB
C++
|
|
#include "./coroutine.hpp"
|
|
|
|
#include <array>
|
|
#include <cstdio>
|
|
#include "../util/os.hpp"
|
|
|
|
namespace mijin
|
|
{
|
|
|
|
//
|
|
// internal defines
|
|
//
|
|
|
|
//
|
|
// internal constants
|
|
//
|
|
|
|
//
|
|
// internal types
|
|
//
|
|
|
|
//
|
|
// internal variables
|
|
//
|
|
|
|
namespace impl
|
|
{
|
|
thread_local TaskLoop::StoredTask* gCurrentTask = nullptr;
|
|
}
|
|
|
|
//
|
|
// internal functions
|
|
//
|
|
|
|
void MultiThreadedTaskLoop::managerThread(std::stop_token stopToken) // NOLINT(performance-unnecessary-value-param)
|
|
{
|
|
setCurrentThreadName("Task Manager");
|
|
|
|
while (!stopToken.stop_requested())
|
|
{
|
|
// first clear out any parked tasks that are actually finished
|
|
auto it = std::remove_if(parkedTasks_.begin(), parkedTasks_.end(), [](StoredTask& task) {
|
|
return !task.task || task.task->status() == TaskStatus::FINISHED;
|
|
});
|
|
parkedTasks_.erase(it, parkedTasks_.end());
|
|
|
|
// then try to push any task from the buffer into the queue, if possible
|
|
for (auto it = parkedTasks_.begin(); it != parkedTasks_.end();)
|
|
{
|
|
if (!it->task->canResume())
|
|
{
|
|
++it;
|
|
continue;
|
|
}
|
|
|
|
if (readyTasks_.tryPushMaybeMove(*it)) {
|
|
it = parkedTasks_.erase(it);
|
|
}
|
|
else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// then clear the incoming task queue
|
|
while (true)
|
|
{
|
|
std::optional<StoredTask> task = queuedTasks_.tryPop();
|
|
if (!task.has_value()) {
|
|
break;
|
|
}
|
|
|
|
// try to directly move it into the next queue
|
|
if (readyTasks_.tryPushMaybeMove(*task)) {
|
|
continue;
|
|
}
|
|
|
|
// otherwise park it
|
|
parkedTasks_.push_back(std::move(*task));
|
|
}
|
|
|
|
// next collect tasks returning from the worker threads
|
|
while (true)
|
|
{
|
|
std::optional<StoredTask> task = returningTasks_.tryPop();
|
|
if (!task.has_value()) {
|
|
break;
|
|
}
|
|
|
|
if (task->task == nullptr || task->task->status() == TaskStatus::FINISHED) {
|
|
continue; // task has been transferred or finished
|
|
}
|
|
|
|
if (task->task->canResume() && readyTasks_.tryPushMaybeMove(*task)) {
|
|
continue; // instantly resume, no questions asked
|
|
}
|
|
|
|
// otherwise park it for future processing
|
|
parkedTasks_.push_back(std::move(*task));
|
|
}
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
|
}
|
|
}
|
|
|
|
void MultiThreadedTaskLoop::workerThread(std::stop_token stopToken, std::size_t workerId) // NOLINT(performance-unnecessary-value-param)
|
|
{
|
|
currentLoopStorage() = this; // forever (on this thread)
|
|
|
|
std::array<char, 16> threadName;
|
|
(void) std::snprintf(threadName.data(), 16, "Task Worker %lu", static_cast<unsigned long>(workerId));
|
|
setCurrentThreadName(threadName.data());
|
|
|
|
while (!stopToken.stop_requested())
|
|
{
|
|
// try to fetch a task to run
|
|
std::optional<StoredTask> task = readyTasks_.tryPop();
|
|
if (!task.has_value())
|
|
{
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
|
continue;
|
|
}
|
|
|
|
// run it
|
|
impl::gCurrentTask = &*task;
|
|
tickTask(*task);
|
|
impl::gCurrentTask = nullptr;
|
|
|
|
// and give it back
|
|
returningTasks_.push(std::move(*task));
|
|
}
|
|
}
|
|
|
|
//
|
|
// public functions
|
|
//
|
|
|
|
void SimpleTaskLoop::transferCurrentTask(TaskLoop& otherLoop) noexcept
|
|
{
|
|
assertCorrectThread();
|
|
|
|
if (&otherLoop == this) {
|
|
return;
|
|
}
|
|
|
|
MIJIN_ASSERT_FATAL(currentTask_ != tasks_.end(), "Trying to call transferCurrentTask() while not running a task!");
|
|
|
|
// now start the transfer, first disown the task
|
|
StoredTask storedTask = std::move(*currentTask_);
|
|
currentTask_->task = nullptr; // just to be sure
|
|
|
|
// then send it over to the other loop
|
|
otherLoop.addStoredTask(std::move(storedTask));
|
|
}
|
|
|
|
void SimpleTaskLoop::addStoredTask(StoredTask&& storedTask) noexcept
|
|
{
|
|
storedTask.task->setLoop(this);
|
|
if (threadId_ == std::thread::id() || threadId_ == std::this_thread::get_id())
|
|
{
|
|
// same thread, just copy it over
|
|
if (currentLoopStorage() != nullptr) {
|
|
// currently running, can't append to tasks_ directly
|
|
newTasks_.push_back(std::move(storedTask));
|
|
}
|
|
else {
|
|
tasks_.push_back(std::move(storedTask));
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// other thread, better be safe
|
|
queuedTasks_.push(std::move(storedTask));
|
|
}
|
|
}
|
|
|
|
std::size_t SimpleTaskLoop::getActiveTasks() const noexcept
|
|
{
|
|
std::size_t sum = 0;
|
|
for (const StoredTask& task : mijin::chain(tasks_, newTasks_))
|
|
{
|
|
const TaskStatus status = task.task ? task.task->status() : TaskStatus::FINISHED;
|
|
if (status == TaskStatus::SUSPENDED || status == TaskStatus::RUNNING)
|
|
{
|
|
++sum;
|
|
}
|
|
}
|
|
return sum;
|
|
}
|
|
|
|
void MultiThreadedTaskLoop::transferCurrentTask(TaskLoop& otherLoop) noexcept
|
|
{
|
|
if (&otherLoop == this) {
|
|
return;
|
|
}
|
|
|
|
MIJIN_ASSERT_FATAL(impl::gCurrentTask != nullptr, "Trying to call transferCurrentTask() while not running a task!");
|
|
|
|
// now start the transfer, first disown the task
|
|
StoredTask storedTask = std::move(*impl::gCurrentTask);
|
|
impl::gCurrentTask->task = nullptr; // just to be sure
|
|
|
|
// then send it over to the other loop
|
|
otherLoop.addStoredTask(std::move(storedTask));
|
|
}
|
|
|
|
void MultiThreadedTaskLoop::addStoredTask(StoredTask&& storedTask) noexcept
|
|
{
|
|
storedTask.task->setLoop(this);
|
|
|
|
// just assume we are not on the manager thread, as that wouldn't make sense
|
|
queuedTasks_.push(std::move(storedTask));
|
|
}
|
|
|
|
void MultiThreadedTaskLoop::start(std::size_t numWorkerThreads)
|
|
{
|
|
managerThread_ = std::jthread([this](std::stop_token stopToken) { managerThread(std::move(stopToken)); });
|
|
workerThreads_.reserve(numWorkerThreads);
|
|
for (std::size_t workerId = 0; workerId < numWorkerThreads; ++workerId) {
|
|
workerThreads_.emplace_back([this, workerId](std::stop_token stopToken) { workerThread(std::move(stopToken), workerId); });
|
|
}
|
|
}
|
|
|
|
void MultiThreadedTaskLoop::stop()
|
|
{
|
|
workerThreads_.clear(); // will also set the stop token
|
|
managerThread_ = {}; // this too
|
|
}
|
|
|
|
} // namespace mijin
|