214 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			214 lines
		
	
	
		
			5.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
 | 
						|
#include "./coroutine.hpp"
 | 
						|
 | 
						|
#include <array>
 | 
						|
#include <cstdio>
 | 
						|
#include "../util/os.hpp"
 | 
						|
 | 
						|
namespace mijin
 | 
						|
{
 | 
						|
 | 
						|
//
 | 
						|
// internal defines
 | 
						|
//
 | 
						|
 | 
						|
//
 | 
						|
// internal constants
 | 
						|
//
 | 
						|
 | 
						|
//
 | 
						|
// internal types
 | 
						|
//
 | 
						|
 | 
						|
//
 | 
						|
// internal variables
 | 
						|
//
 | 
						|
 | 
						|
thread_local TaskLoop::StoredTask* MultiThreadedTaskLoop::currentTask_ = nullptr;
 | 
						|
 | 
						|
//
 | 
						|
// internal functions
 | 
						|
//
 | 
						|
 | 
						|
void MultiThreadedTaskLoop::managerThread(std::stop_token stopToken) // NOLINT(performance-unnecessary-value-param)
 | 
						|
{
 | 
						|
    setCurrentThreadName("Task Manager");
 | 
						|
 | 
						|
    while (!stopToken.stop_requested())
 | 
						|
    {
 | 
						|
        // first clear out any parked tasks that are actually finished
 | 
						|
        auto it = std::remove_if(parkedTasks_.begin(), parkedTasks_.end(), [](StoredTask& task) {
 | 
						|
            return !task.task || task.task->status() == TaskStatus::FINISHED;
 | 
						|
        });
 | 
						|
        parkedTasks_.erase(it, parkedTasks_.end());
 | 
						|
 | 
						|
        // then try to push any task from the buffer into the queue, if possible
 | 
						|
        for (auto it = parkedTasks_.begin(); it != parkedTasks_.end();)
 | 
						|
        {
 | 
						|
            if (!it->task->canResume())
 | 
						|
            {
 | 
						|
                ++it;
 | 
						|
                continue;
 | 
						|
            }
 | 
						|
 | 
						|
            if (readyTasks_.tryPushMaybeMove(*it)) {
 | 
						|
                it = parkedTasks_.erase(it);
 | 
						|
            }
 | 
						|
            else {
 | 
						|
                break;
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        // then clear the incoming task queue
 | 
						|
        while (true)
 | 
						|
        {
 | 
						|
            std::optional<StoredTask> task = queuedTasks_.tryPop();
 | 
						|
            if (!task.has_value()) {
 | 
						|
                break;
 | 
						|
            }
 | 
						|
            
 | 
						|
            // try to directly move it into the next queue
 | 
						|
            if (readyTasks_.tryPushMaybeMove(*task)) {
 | 
						|
                continue;
 | 
						|
            }
 | 
						|
 | 
						|
            // otherwise park it
 | 
						|
            parkedTasks_.push_back(std::move(*task));
 | 
						|
        }
 | 
						|
 | 
						|
        // next collect tasks returning from the worker threads
 | 
						|
        while (true)
 | 
						|
        {
 | 
						|
            std::optional<StoredTask> task = returningTasks_.tryPop();
 | 
						|
            if (!task.has_value()) {
 | 
						|
                break;
 | 
						|
            }
 | 
						|
 | 
						|
            if (task->task == nullptr || task->task->status() == TaskStatus::FINISHED) {
 | 
						|
                continue; // task has been transferred or finished
 | 
						|
            }
 | 
						|
 | 
						|
            if (task->task->canResume() && readyTasks_.tryPushMaybeMove(*task)) {
 | 
						|
                continue; // instantly resume, no questions asked
 | 
						|
            }
 | 
						|
 | 
						|
            // otherwise park it for future processing
 | 
						|
            parkedTasks_.push_back(std::move(*task));
 | 
						|
        }
 | 
						|
 | 
						|
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
void MultiThreadedTaskLoop::workerThread(std::stop_token stopToken, std::size_t workerId) // NOLINT(performance-unnecessary-value-param)
 | 
						|
{
 | 
						|
    currentLoopStorage() = this; // forever (on this thread)
 | 
						|
    
 | 
						|
    std::array<char, 16> threadName;
 | 
						|
    (void) std::snprintf(threadName.data(), 16, "Task Worker %lu", static_cast<unsigned long>(workerId));
 | 
						|
    setCurrentThreadName(threadName.data());
 | 
						|
 | 
						|
    while (!stopToken.stop_requested())
 | 
						|
    {
 | 
						|
        // try to fetch a task to run
 | 
						|
        std::optional<StoredTask> task = readyTasks_.tryPop();
 | 
						|
        if (!task.has_value())
 | 
						|
        {
 | 
						|
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
 | 
						|
            continue;
 | 
						|
        }
 | 
						|
 | 
						|
        // run it
 | 
						|
        currentTask_ = &*task;
 | 
						|
        tickTask(*task);
 | 
						|
        currentTask_ = nullptr;
 | 
						|
 | 
						|
        // and give it back
 | 
						|
        returningTasks_.push(std::move(*task));
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
//
 | 
						|
// public functions
 | 
						|
//
 | 
						|
 | 
						|
void SimpleTaskLoop::transferCurrentTask(TaskLoop& otherLoop) noexcept
 | 
						|
{
 | 
						|
    assertCorrectThread();
 | 
						|
 | 
						|
    if (&otherLoop == this) {
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    MIJIN_ASSERT_FATAL(currentTask_ != tasks_.end(), "Trying to call transferCurrentTask() while not running a task!");
 | 
						|
 | 
						|
    // now start the transfer, first disown the task
 | 
						|
    StoredTask storedTask = std::move(*currentTask_);
 | 
						|
    currentTask_->task = nullptr; // just to be sure
 | 
						|
 | 
						|
    // then send it over to the other loop
 | 
						|
    otherLoop.addStoredTask(std::move(storedTask));
 | 
						|
}
 | 
						|
 | 
						|
void SimpleTaskLoop::addStoredTask(StoredTask&& storedTask) noexcept
 | 
						|
{
 | 
						|
    storedTask.task->setLoop(this);
 | 
						|
    if (threadId_ == std::thread::id() || threadId_ == std::this_thread::get_id())
 | 
						|
    {
 | 
						|
        // same thread, just copy it over
 | 
						|
        if (currentLoopStorage() != nullptr) {
 | 
						|
            // currently running, can't append to tasks_ directly
 | 
						|
            newTasks_.push_back(std::move(storedTask));
 | 
						|
        }
 | 
						|
        else {
 | 
						|
            tasks_.push_back(std::move(storedTask));
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        // other thread, better be safe
 | 
						|
        queuedTasks_.push(std::move(storedTask));
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
void MultiThreadedTaskLoop::transferCurrentTask(TaskLoop& otherLoop) noexcept
 | 
						|
{
 | 
						|
    if (&otherLoop == this) {
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    MIJIN_ASSERT_FATAL(currentTask_ != nullptr, "Trying to call transferCurrentTask() while not running a task!");
 | 
						|
 | 
						|
    // now start the transfer, first disown the task
 | 
						|
    StoredTask storedTask = std::move(*currentTask_);
 | 
						|
    currentTask_->task = nullptr; // just to be sure
 | 
						|
 | 
						|
    // then send it over to the other loop
 | 
						|
    otherLoop.addStoredTask(std::move(storedTask));
 | 
						|
}
 | 
						|
 | 
						|
void MultiThreadedTaskLoop::addStoredTask(StoredTask&& storedTask) noexcept
 | 
						|
{
 | 
						|
    storedTask.task->setLoop(this);
 | 
						|
    
 | 
						|
    // just assume we are not on the manager thread, as that wouldn't make sense
 | 
						|
    queuedTasks_.push(std::move(storedTask));
 | 
						|
}
 | 
						|
 | 
						|
void MultiThreadedTaskLoop::start(std::size_t numWorkerThreads)
 | 
						|
{
 | 
						|
    managerThread_ = std::jthread([this](std::stop_token stopToken) { managerThread(std::move(stopToken)); });
 | 
						|
    workerThreads_.reserve(numWorkerThreads);
 | 
						|
    for (std::size_t workerId = 0; workerId < numWorkerThreads; ++workerId) {
 | 
						|
        workerThreads_.emplace_back([this, workerId](std::stop_token stopToken) { workerThread(std::move(stopToken), workerId); });
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
void MultiThreadedTaskLoop::stop()
 | 
						|
{
 | 
						|
    workerThreads_.clear(); // will also set the stop token
 | 
						|
    managerThread_ = {}; // this too
 | 
						|
}
 | 
						|
 | 
						|
} // namespace mijin
 |