Made coroutines allocator-aware (let's hope this really works).
This commit is contained in:
@@ -26,205 +26,15 @@ namespace mijin
|
||||
|
||||
namespace impl
|
||||
{
|
||||
thread_local TaskLoop::StoredTask* gCurrentTask = nullptr;
|
||||
thread_local std::shared_ptr<TaskSharedState> gCurrentTaskState;
|
||||
}
|
||||
|
||||
//
|
||||
// internal functions
|
||||
//
|
||||
|
||||
void MultiThreadedTaskLoop::managerThread(std::stop_token stopToken) // NOLINT(performance-unnecessary-value-param)
|
||||
{
|
||||
setCurrentThreadName("Task Manager");
|
||||
|
||||
while (!stopToken.stop_requested())
|
||||
{
|
||||
// first clear out any parked tasks that are actually finished
|
||||
auto itRem = std::remove_if(parkedTasks_.begin(), parkedTasks_.end(), [](StoredTask& task) {
|
||||
return !task.task || task.task->status() == TaskStatus::FINISHED;
|
||||
});
|
||||
parkedTasks_.erase(itRem, parkedTasks_.end());
|
||||
|
||||
// then try to push any task from the buffer into the queue, if possible
|
||||
for (auto it = parkedTasks_.begin(); it != parkedTasks_.end();)
|
||||
{
|
||||
if (!it->task->canResume())
|
||||
{
|
||||
++it;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (readyTasks_.tryPushMaybeMove(*it)) {
|
||||
it = parkedTasks_.erase(it);
|
||||
}
|
||||
else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// then clear the incoming task queue
|
||||
while (true)
|
||||
{
|
||||
std::optional<StoredTask> task = queuedTasks_.tryPop();
|
||||
if (!task.has_value()) {
|
||||
break;
|
||||
}
|
||||
|
||||
// try to directly move it into the next queue
|
||||
if (readyTasks_.tryPushMaybeMove(*task)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// otherwise park it
|
||||
parkedTasks_.push_back(std::move(*task));
|
||||
}
|
||||
|
||||
// next collect tasks returning from the worker threads
|
||||
while (true)
|
||||
{
|
||||
std::optional<StoredTask> task = returningTasks_.tryPop();
|
||||
if (!task.has_value()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (task->task == nullptr || task->task->status() == TaskStatus::FINISHED) {
|
||||
continue; // task has been transferred or finished
|
||||
}
|
||||
|
||||
if (task->task->canResume() && readyTasks_.tryPushMaybeMove(*task)) {
|
||||
continue; // instantly resume, no questions asked
|
||||
}
|
||||
|
||||
// otherwise park it for future processing
|
||||
parkedTasks_.push_back(std::move(*task));
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
}
|
||||
}
|
||||
|
||||
void MultiThreadedTaskLoop::workerThread(std::stop_token stopToken, std::size_t workerId) // NOLINT(performance-unnecessary-value-param)
|
||||
{
|
||||
currentLoopStorage() = this; // forever (on this thread)
|
||||
|
||||
std::array<char, 16> threadName;
|
||||
(void) std::snprintf(threadName.data(), 16, "Task Worker %lu", static_cast<unsigned long>(workerId));
|
||||
setCurrentThreadName(threadName.data());
|
||||
|
||||
while (!stopToken.stop_requested())
|
||||
{
|
||||
// try to fetch a task to run
|
||||
std::optional<StoredTask> task = readyTasks_.tryPop();
|
||||
if (!task.has_value())
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
continue;
|
||||
}
|
||||
|
||||
// run it
|
||||
impl::gCurrentTask = &*task;
|
||||
tickTask(*task);
|
||||
impl::gCurrentTask = nullptr;
|
||||
|
||||
// and give it back
|
||||
returningTasks_.push(std::move(*task));
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// public functions
|
||||
//
|
||||
|
||||
void SimpleTaskLoop::transferCurrentTask(TaskLoop& otherLoop) MIJIN_NOEXCEPT
|
||||
{
|
||||
assertCorrectThread();
|
||||
|
||||
if (&otherLoop == this) {
|
||||
return;
|
||||
}
|
||||
|
||||
MIJIN_ASSERT_FATAL(currentTask_ != tasks_.end(), "Trying to call transferCurrentTask() while not running a task!");
|
||||
|
||||
// now start the transfer, first disown the task
|
||||
StoredTask storedTask = std::move(*currentTask_);
|
||||
currentTask_->task = nullptr; // just to be sure
|
||||
|
||||
// then send it over to the other loop
|
||||
otherLoop.addStoredTask(std::move(storedTask));
|
||||
}
|
||||
|
||||
void SimpleTaskLoop::addStoredTask(StoredTask&& storedTask) MIJIN_NOEXCEPT
|
||||
{
|
||||
storedTask.task->setLoop(this);
|
||||
if (threadId_ == std::thread::id() || threadId_ == std::this_thread::get_id())
|
||||
{
|
||||
// same thread, just copy it over
|
||||
if (currentLoopStorage() != nullptr) {
|
||||
// currently running, can't append to tasks_ directly
|
||||
newTasks_.push_back(std::move(storedTask));
|
||||
}
|
||||
else {
|
||||
tasks_.push_back(std::move(storedTask));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// other thread, better be safe
|
||||
queuedTasks_.push(std::move(storedTask));
|
||||
}
|
||||
}
|
||||
|
||||
std::size_t SimpleTaskLoop::getActiveTasks() const MIJIN_NOEXCEPT
|
||||
{
|
||||
std::size_t sum = 0;
|
||||
for (const StoredTask& task : mijin::chain(tasks_, newTasks_))
|
||||
{
|
||||
const TaskStatus status = task.task ? task.task->status() : TaskStatus::FINISHED;
|
||||
if (status == TaskStatus::SUSPENDED || status == TaskStatus::RUNNING)
|
||||
{
|
||||
++sum;
|
||||
}
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
void MultiThreadedTaskLoop::transferCurrentTask(TaskLoop& otherLoop) MIJIN_NOEXCEPT
|
||||
{
|
||||
if (&otherLoop == this) {
|
||||
return;
|
||||
}
|
||||
|
||||
MIJIN_ASSERT_FATAL(impl::gCurrentTask != nullptr, "Trying to call transferCurrentTask() while not running a task!");
|
||||
|
||||
// now start the transfer, first disown the task
|
||||
StoredTask storedTask = std::move(*impl::gCurrentTask);
|
||||
impl::gCurrentTask->task = nullptr; // just to be sure
|
||||
|
||||
// then send it over to the other loop
|
||||
otherLoop.addStoredTask(std::move(storedTask));
|
||||
}
|
||||
|
||||
void MultiThreadedTaskLoop::addStoredTask(StoredTask&& storedTask) MIJIN_NOEXCEPT
|
||||
{
|
||||
storedTask.task->setLoop(this);
|
||||
|
||||
// just assume we are not on the manager thread, as that wouldn't make sense
|
||||
queuedTasks_.push(std::move(storedTask));
|
||||
}
|
||||
|
||||
void MultiThreadedTaskLoop::start(std::size_t numWorkerThreads)
|
||||
{
|
||||
managerThread_ = std::jthread([this](std::stop_token stopToken) { managerThread(std::move(stopToken)); });
|
||||
workerThreads_.reserve(numWorkerThreads);
|
||||
for (std::size_t workerId = 0; workerId < numWorkerThreads; ++workerId) {
|
||||
workerThreads_.emplace_back([this, workerId](std::stop_token stopToken) { workerThread(std::move(stopToken), workerId); });
|
||||
}
|
||||
}
|
||||
|
||||
void MultiThreadedTaskLoop::stop()
|
||||
{
|
||||
workerThreads_.clear(); // will also set the stop token
|
||||
managerThread_ = {}; // this too
|
||||
}
|
||||
|
||||
} // namespace mijin
|
||||
|
||||
Reference in New Issue
Block a user