Compare commits

...

9 Commits

10 changed files with 762 additions and 280 deletions

View File

@ -26,205 +26,15 @@ namespace mijin
namespace impl namespace impl
{ {
thread_local TaskLoop::StoredTask* gCurrentTask = nullptr; thread_local std::shared_ptr<TaskSharedState> gCurrentTaskState;
} }
// //
// internal functions // internal functions
// //
void MultiThreadedTaskLoop::managerThread(std::stop_token stopToken) // NOLINT(performance-unnecessary-value-param)
{
setCurrentThreadName("Task Manager");
while (!stopToken.stop_requested())
{
// first clear out any parked tasks that are actually finished
auto itRem = std::remove_if(parkedTasks_.begin(), parkedTasks_.end(), [](StoredTask& task) {
return !task.task || task.task->status() == TaskStatus::FINISHED;
});
parkedTasks_.erase(itRem, parkedTasks_.end());
// then try to push any task from the buffer into the queue, if possible
for (auto it = parkedTasks_.begin(); it != parkedTasks_.end();)
{
if (!it->task->canResume())
{
++it;
continue;
}
if (readyTasks_.tryPushMaybeMove(*it)) {
it = parkedTasks_.erase(it);
}
else {
break;
}
}
// then clear the incoming task queue
while (true)
{
std::optional<StoredTask> task = queuedTasks_.tryPop();
if (!task.has_value()) {
break;
}
// try to directly move it into the next queue
if (readyTasks_.tryPushMaybeMove(*task)) {
continue;
}
// otherwise park it
parkedTasks_.push_back(std::move(*task));
}
// next collect tasks returning from the worker threads
while (true)
{
std::optional<StoredTask> task = returningTasks_.tryPop();
if (!task.has_value()) {
break;
}
if (task->task == nullptr || task->task->status() == TaskStatus::FINISHED) {
continue; // task has been transferred or finished
}
if (task->task->canResume() && readyTasks_.tryPushMaybeMove(*task)) {
continue; // instantly resume, no questions asked
}
// otherwise park it for future processing
parkedTasks_.push_back(std::move(*task));
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
void MultiThreadedTaskLoop::workerThread(std::stop_token stopToken, std::size_t workerId) // NOLINT(performance-unnecessary-value-param)
{
currentLoopStorage() = this; // forever (on this thread)
std::array<char, 16> threadName;
(void) std::snprintf(threadName.data(), 16, "Task Worker %lu", static_cast<unsigned long>(workerId));
setCurrentThreadName(threadName.data());
while (!stopToken.stop_requested())
{
// try to fetch a task to run
std::optional<StoredTask> task = readyTasks_.tryPop();
if (!task.has_value())
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
continue;
}
// run it
impl::gCurrentTask = &*task;
tickTask(*task);
impl::gCurrentTask = nullptr;
// and give it back
returningTasks_.push(std::move(*task));
}
}
// //
// public functions // public functions
// //
void SimpleTaskLoop::transferCurrentTask(TaskLoop& otherLoop) MIJIN_NOEXCEPT
{
assertCorrectThread();
if (&otherLoop == this) {
return;
}
MIJIN_ASSERT_FATAL(currentTask_ != tasks_.end(), "Trying to call transferCurrentTask() while not running a task!");
// now start the transfer, first disown the task
StoredTask storedTask = std::move(*currentTask_);
currentTask_->task = nullptr; // just to be sure
// then send it over to the other loop
otherLoop.addStoredTask(std::move(storedTask));
}
void SimpleTaskLoop::addStoredTask(StoredTask&& storedTask) MIJIN_NOEXCEPT
{
storedTask.task->setLoop(this);
if (threadId_ == std::thread::id() || threadId_ == std::this_thread::get_id())
{
// same thread, just copy it over
if (currentLoopStorage() != nullptr) {
// currently running, can't append to tasks_ directly
newTasks_.push_back(std::move(storedTask));
}
else {
tasks_.push_back(std::move(storedTask));
}
}
else
{
// other thread, better be safe
queuedTasks_.push(std::move(storedTask));
}
}
std::size_t SimpleTaskLoop::getActiveTasks() const MIJIN_NOEXCEPT
{
std::size_t sum = 0;
for (const StoredTask& task : mijin::chain(tasks_, newTasks_))
{
const TaskStatus status = task.task ? task.task->status() : TaskStatus::FINISHED;
if (status == TaskStatus::SUSPENDED || status == TaskStatus::RUNNING)
{
++sum;
}
}
return sum;
}
void MultiThreadedTaskLoop::transferCurrentTask(TaskLoop& otherLoop) MIJIN_NOEXCEPT
{
if (&otherLoop == this) {
return;
}
MIJIN_ASSERT_FATAL(impl::gCurrentTask != nullptr, "Trying to call transferCurrentTask() while not running a task!");
// now start the transfer, first disown the task
StoredTask storedTask = std::move(*impl::gCurrentTask);
impl::gCurrentTask->task = nullptr; // just to be sure
// then send it over to the other loop
otherLoop.addStoredTask(std::move(storedTask));
}
void MultiThreadedTaskLoop::addStoredTask(StoredTask&& storedTask) MIJIN_NOEXCEPT
{
storedTask.task->setLoop(this);
// just assume we are not on the manager thread, as that wouldn't make sense
queuedTasks_.push(std::move(storedTask));
}
void MultiThreadedTaskLoop::start(std::size_t numWorkerThreads)
{
managerThread_ = std::jthread([this](std::stop_token stopToken) { managerThread(std::move(stopToken)); });
workerThreads_.reserve(numWorkerThreads);
for (std::size_t workerId = 0; workerId < numWorkerThreads; ++workerId) {
workerThreads_.emplace_back([this, workerId](std::stop_token stopToken) { workerThread(std::move(stopToken), workerId); });
}
}
void MultiThreadedTaskLoop::stop()
{
workerThreads_.clear(); // will also set the stop token
managerThread_ = {}; // this too
}
} // namespace mijin } // namespace mijin

View File

@ -10,6 +10,7 @@
#endif #endif
#include <any> #include <any>
#include <chrono>
#include <coroutine> #include <coroutine>
#include <exception> #include <exception>
#include <memory> #include <memory>
@ -20,6 +21,7 @@
#include "./message_queue.hpp" #include "./message_queue.hpp"
#include "../container/optional.hpp" #include "../container/optional.hpp"
#include "../internal/common.hpp" #include "../internal/common.hpp"
#include "../memory/memutil.hpp"
#include "../util/flag.hpp" #include "../util/flag.hpp"
#include "../util/iterators.hpp" #include "../util/iterators.hpp"
#include "../util/traits.hpp" #include "../util/traits.hpp"
@ -63,9 +65,10 @@ enum class TaskStatus
template<typename T> template<typename T>
struct TaskState; struct TaskState;
template<template<typename> typename TAllocator = MIJIN_DEFAULT_ALLOCATOR>
class TaskLoop; class TaskLoop;
template<typename TResult = void> template<typename TResult = void, template<typename> typename TAllocator = MIJIN_DEFAULT_ALLOCATOR>
class TaskBase; class TaskBase;
#if MIJIN_COROUTINE_ENABLE_CANCEL #if MIJIN_COROUTINE_ENABLE_CANCEL
@ -103,6 +106,7 @@ public:
} }
inline void cancel() const MIJIN_NOEXCEPT; inline void cancel() const MIJIN_NOEXCEPT;
[[nodiscard]] inline Optional<std::source_location> getLocation() const MIJIN_NOEXCEPT;
#if MIJIN_COROUTINE_ENABLE_DEBUG_INFO #if MIJIN_COROUTINE_ENABLE_DEBUG_INFO
inline Optional<Stacktrace> getCreationStack() const MIJIN_NOEXCEPT; inline Optional<Stacktrace> getCreationStack() const MIJIN_NOEXCEPT;
#endif #endif
@ -111,6 +115,7 @@ struct TaskSharedState
{ {
std::atomic_bool cancelled_ = false; std::atomic_bool cancelled_ = false;
TaskHandle subTask; TaskHandle subTask;
std::source_location sourceLoc;
#if MIJIN_COROUTINE_ENABLE_DEBUG_INFO #if MIJIN_COROUTINE_ENABLE_DEBUG_INFO
Stacktrace creationStack_; Stacktrace creationStack_;
#endif #endif
@ -245,16 +250,61 @@ struct TaskAwaitableSuspend
} }
}; };
template<typename TTraits> namespace impl
{
template<typename T>
using default_is_valid = T::default_is_valid_t;
}
template<template<typename> typename TAllocator>
struct TaskAllocatorTraits
{
static constexpr bool default_is_valid_v = detect_or_t<std::true_type, impl::default_is_valid, TAllocator<void>>::value;
template<typename T>
static TAllocator<T> create()
{
auto taskLoop = TaskLoop<TAllocator>::currentOpt();
if (taskLoop != nullptr)
{
return TAllocator<T>(taskLoop->getAllocator());
}
return TAllocator<T>();
}
};
template<>
struct TaskAllocatorTraits<std::allocator>
{
static constexpr bool default_is_valid_v = true;
template<typename T>
static std::allocator<T> create() noexcept
{
return std::allocator<T>();
}
};
template<template<typename> typename TAllocator, typename T>
TAllocator<T> makeTaskAllocator()
{
return TaskAllocatorTraits<TAllocator>::template create<T>();
}
template<typename TTraits, template<typename> typename TAllocator = MIJIN_DEFAULT_ALLOCATOR>
struct TaskPromise : impl::TaskReturn<typename TTraits::result_t, TaskPromise<TTraits>> struct TaskPromise : impl::TaskReturn<typename TTraits::result_t, TaskPromise<TTraits>>
{ {
using handle_t = std::coroutine_handle<TaskPromise>; using handle_t = std::coroutine_handle<TaskPromise>;
using task_t = typename TTraits::task_t; using task_t = typename TTraits::task_t;
using result_t = typename TTraits::result_t; using result_t = typename TTraits::result_t;
[[no_unique_address]] TAllocator<std::max_align_t> allocator_;
TaskState<result_t> state_; TaskState<result_t> state_;
std::shared_ptr<TaskSharedState> sharedState_ = std::make_shared<TaskSharedState>(); std::shared_ptr<TaskSharedState> sharedState_;
TaskLoop* loop_ = nullptr; TaskLoop<TAllocator>* loop_ = nullptr;
explicit TaskPromise(TAllocator<std::max_align_t> allocator = makeTaskAllocator<TAllocator, std::max_align_t>()) MIJIN_NOEXCEPT_IF(std::is_nothrow_move_constructible_v<TAllocator<TaskSharedState>>)
: allocator_(std::move(allocator)), sharedState_(std::allocate_shared<TaskSharedState>(TAllocator<TaskSharedState>(allocator_))) {}
constexpr task_t get_return_object() MIJIN_NOEXCEPT { return task_t(handle_t::from_promise(*this)); } constexpr task_t get_return_object() MIJIN_NOEXCEPT { return task_t(handle_t::from_promise(*this)); }
constexpr TaskAwaitableSuspend initial_suspend() MIJIN_NOEXCEPT { return {}; } constexpr TaskAwaitableSuspend initial_suspend() MIJIN_NOEXCEPT { return {}; }
@ -271,9 +321,10 @@ struct TaskPromise : impl::TaskReturn<typename TTraits::result_t, TaskPromise<TT
// constexpr void unhandled_exception() MIJIN_NOEXCEPT {} // constexpr void unhandled_exception() MIJIN_NOEXCEPT {}
template<typename TValue> template<typename TValue>
auto await_transform(FuturePtr<TValue> future) MIJIN_NOEXCEPT auto await_transform(FuturePtr<TValue> future, std::source_location sourceLoc = std::source_location::current()) MIJIN_NOEXCEPT
{ {
MIJIN_ASSERT(loop_ != nullptr, "Cannot await future outside of a loop!"); MIJIN_ASSERT(loop_ != nullptr, "Cannot await future outside of a loop!");
sharedState_->sourceLoc = std::move(sourceLoc);
TaskAwaitableFuture<TValue> awaitable{future}; TaskAwaitableFuture<TValue> awaitable{future};
if (!awaitable.await_ready()) if (!awaitable.await_ready())
{ {
@ -287,17 +338,18 @@ struct TaskPromise : impl::TaskReturn<typename TTraits::result_t, TaskPromise<TT
} }
template<typename TResultOther> template<typename TResultOther>
auto await_transform(TaskBase<TResultOther> task) MIJIN_NOEXCEPT auto await_transform(TaskBase<TResultOther> task, std::source_location sourceLoc = std::source_location::current()) MIJIN_NOEXCEPT
{ {
MIJIN_ASSERT(loop_ != nullptr, "Cannot await another task outside of a loop!"); // NOLINT(clang-analyzer-core.UndefinedBinaryOperatorResult) MIJIN_ASSERT(loop_ != nullptr, "Cannot await another task outside of a loop!"); // NOLINT(clang-analyzer-core.UndefinedBinaryOperatorResult)
auto future = delayEvaluation<TResultOther>(loop_)->addTask(std::move(task), &sharedState_->subTask); // hackidyhack: delay evaluation of the type of loop_ as it is only forward-declared here auto future = delayEvaluation<TResultOther>(loop_)->addTaskImpl(std::move(task), &sharedState_->subTask); // hackidyhack: delay evaluation of the type of loop_ as it is only forward-declared here
return await_transform(future); return await_transform(future, std::move(sourceLoc));
} }
template<typename TFirstArg, typename TSecondArg, typename... TArgs> template<typename TFirstArg, typename TSecondArg, typename... TArgs>
auto await_transform(Signal<TFirstArg, TSecondArg, TArgs...>& signal) MIJIN_NOEXCEPT auto await_transform(Signal<TFirstArg, TSecondArg, TArgs...>& signal, std::source_location sourceLoc = std::source_location::current()) MIJIN_NOEXCEPT
{ {
auto data = std::make_shared<std::tuple<TFirstArg, TSecondArg, TArgs...>>(); auto data = std::make_shared<std::tuple<TFirstArg, TSecondArg, TArgs...>>();
sharedState_->sourceLoc = std::move(sourceLoc);
signal.connect([this, data](TFirstArg arg0, TSecondArg arg1, TArgs... args) mutable signal.connect([this, data](TFirstArg arg0, TSecondArg arg1, TArgs... args) mutable
{ {
*data = std::make_tuple(std::move(arg0), std::move(arg1), std::move(args)...); *data = std::make_tuple(std::move(arg0), std::move(arg1), std::move(args)...);
@ -309,9 +361,10 @@ struct TaskPromise : impl::TaskReturn<typename TTraits::result_t, TaskPromise<TT
} }
template<typename TFirstArg> template<typename TFirstArg>
auto await_transform(Signal<TFirstArg>& signal) MIJIN_NOEXCEPT auto await_transform(Signal<TFirstArg>& signal, std::source_location sourceLoc = std::source_location::current()) MIJIN_NOEXCEPT
{ {
auto data = std::make_shared<TFirstArg>(); auto data = std::make_shared<TFirstArg>();
sharedState_->sourceLoc = std::move(sourceLoc);
signal.connect([this, data](TFirstArg arg0) mutable signal.connect([this, data](TFirstArg arg0) mutable
{ {
*data = std::move(arg0); *data = std::move(arg0);
@ -322,8 +375,9 @@ struct TaskPromise : impl::TaskReturn<typename TTraits::result_t, TaskPromise<TT
return awaitable; return awaitable;
} }
auto await_transform(Signal<>& signal) MIJIN_NOEXCEPT auto await_transform(Signal<>& signal, std::source_location sourceLoc = std::source_location::current()) MIJIN_NOEXCEPT
{ {
sharedState_->sourceLoc = std::move(sourceLoc);
signal.connect([this]() signal.connect([this]()
{ {
state_.status = TaskStatus::SUSPENDED; state_.status = TaskStatus::SUSPENDED;
@ -333,24 +387,39 @@ struct TaskPromise : impl::TaskReturn<typename TTraits::result_t, TaskPromise<TT
return awaitable; return awaitable;
} }
std::suspend_always await_transform(std::suspend_always) MIJIN_NOEXCEPT std::suspend_always await_transform(std::suspend_always, std::source_location sourceLoc = std::source_location::current()) MIJIN_NOEXCEPT
{ {
sharedState_->sourceLoc = std::move(sourceLoc);
state_.status = TaskStatus::SUSPENDED; state_.status = TaskStatus::SUSPENDED;
return std::suspend_always(); return std::suspend_always();
} }
std::suspend_never await_transform(std::suspend_never) MIJIN_NOEXCEPT { std::suspend_never await_transform(std::suspend_never, std::source_location sourceLoc = std::source_location::current()) MIJIN_NOEXCEPT {
sharedState_->sourceLoc = std::move(sourceLoc);
return std::suspend_never(); return std::suspend_never();
} }
TaskAwaitableSuspend await_transform(TaskAwaitableSuspend) MIJIN_NOEXCEPT TaskAwaitableSuspend await_transform(TaskAwaitableSuspend, std::source_location sourceLoc = std::source_location::current()) MIJIN_NOEXCEPT
{ {
sharedState_->sourceLoc = std::move(sourceLoc);
state_.status = TaskStatus::SUSPENDED; state_.status = TaskStatus::SUSPENDED;
return TaskAwaitableSuspend(); return TaskAwaitableSuspend();
} }
// make sure the allocators are also used for the promise itself
void* operator new(std::size_t size)
{
return makeTaskAllocator<TAllocator, std::max_align_t>().allocate((size - 1) / sizeof(std::max_align_t) + 1);
}
void operator delete(void* ptr, std::size_t size) noexcept
{
TaskPromise* self = static_cast<TaskPromise*>(ptr);
self->allocator_.deallocate(static_cast<std::max_align_t*>(ptr), (size - 1) / sizeof(std::max_align_t) + 1);
}
}; };
template<typename TResult> template<typename TResult, template<typename> typename TAllocator>
class [[nodiscard("Tasks should either we awaited or added to a loop.")]] TaskBase class [[nodiscard("Tasks should either we awaited or added to a loop.")]] TaskBase
{ {
public: public:
@ -362,7 +431,7 @@ public:
using result_t = TResult; using result_t = TResult;
}; };
public: public:
using promise_type = TaskPromise<Traits>; using promise_type = TaskPromise<Traits, TAllocator>;
using handle_t = typename promise_type::handle_t; using handle_t = typename promise_type::handle_t;
private: private:
handle_t handle_; handle_t handle_;
@ -415,11 +484,11 @@ private:
[[nodiscard]] [[nodiscard]]
constexpr handle_t handle() const MIJIN_NOEXCEPT { return handle_; } constexpr handle_t handle() const MIJIN_NOEXCEPT { return handle_; }
[[nodiscard]] [[nodiscard]]
constexpr TaskLoop* getLoop() MIJIN_NOEXCEPT constexpr TaskLoop<TAllocator>* getLoop() MIJIN_NOEXCEPT
{ {
return handle_.promise().loop_; return handle_.promise().loop_;
} }
constexpr void setLoop(TaskLoop* loop) MIJIN_NOEXCEPT constexpr void setLoop(TaskLoop<TAllocator>* loop) MIJIN_NOEXCEPT
{ {
// MIJIN_ASSERT(handle_.promise().loop_ == nullptr // MIJIN_ASSERT(handle_.promise().loop_ == nullptr
// || handle_.promise().loop_ == loop // || handle_.promise().loop_ == loop
@ -427,12 +496,13 @@ private:
handle_.promise().loop_ = loop; handle_.promise().loop_ = loop;
} }
friend class TaskLoop; friend class TaskLoop<TAllocator>;
template<typename TTask> template<typename TTask, template<typename> typename TAllocator2>
friend class WrappedTask; friend class WrappedTask;
}; };
template<template<typename> typename TAllocator = MIJIN_DEFAULT_ALLOCATOR>
class WrappedTaskBase class WrappedTaskBase
{ {
public: public:
@ -444,7 +514,7 @@ public:
virtual void resume() = 0; virtual void resume() = 0;
virtual void* raw() MIJIN_NOEXCEPT = 0; virtual void* raw() MIJIN_NOEXCEPT = 0;
virtual std::coroutine_handle<> handle() MIJIN_NOEXCEPT = 0; virtual std::coroutine_handle<> handle() MIJIN_NOEXCEPT = 0;
virtual void setLoop(TaskLoop* loop) MIJIN_NOEXCEPT = 0; virtual void setLoop(TaskLoop<TAllocator>* loop) MIJIN_NOEXCEPT = 0;
virtual std::shared_ptr<TaskSharedState>& sharedState() MIJIN_NOEXCEPT = 0; virtual std::shared_ptr<TaskSharedState>& sharedState() MIJIN_NOEXCEPT = 0;
[[nodiscard]] inline bool canResume() { [[nodiscard]] inline bool canResume() {
@ -453,8 +523,8 @@ public:
} }
}; };
template<typename TTask> template<typename TTask, template<typename> typename TAllocator = MIJIN_DEFAULT_ALLOCATOR>
class WrappedTask : public WrappedTaskBase class WrappedTask : public WrappedTaskBase<TAllocator>
{ {
private: private:
TTask task_; TTask task_;
@ -480,24 +550,30 @@ public:
void resume() override { task_.resume(); } void resume() override { task_.resume(); }
void* raw() MIJIN_NOEXCEPT override { return &task_; } void* raw() MIJIN_NOEXCEPT override { return &task_; }
std::coroutine_handle<> handle() MIJIN_NOEXCEPT override { return task_.handle(); } std::coroutine_handle<> handle() MIJIN_NOEXCEPT override { return task_.handle(); }
void setLoop(TaskLoop* loop) MIJIN_NOEXCEPT override { task_.setLoop(loop); } void setLoop(TaskLoop<TAllocator>* loop) MIJIN_NOEXCEPT override { task_.setLoop(loop); }
virtual std::shared_ptr<TaskSharedState>& sharedState() MIJIN_NOEXCEPT override { return task_.sharedState(); } virtual std::shared_ptr<TaskSharedState>& sharedState() MIJIN_NOEXCEPT override { return task_.sharedState(); }
}; };
template<typename TTask> template<typename TTask, template<typename> typename TAllocator>
std::unique_ptr<WrappedTask<TTask>> wrapTask(TTask&& task) MIJIN_NOEXCEPT auto wrapTask(TAllocator<WrappedTask<TTask, TAllocator>> allocator, TTask&& task)
{ {
return std::make_unique<WrappedTask<TTask>>(std::forward<TTask>(task)); using wrapped_task_t = WrappedTask<TTask, TAllocator>;
using deleter_t = AllocatorDeleter<TAllocator<wrapped_task_t>>;
using allocator_t = TAllocator<wrapped_task_t>;
wrapped_task_t* ptr = ::new (allocator.allocate(1)) wrapped_task_t(std::forward<TTask>(task));
return std::unique_ptr<wrapped_task_t, deleter_t>(ptr, AllocatorDeleter<allocator_t>(std::move(allocator)));
} }
template<template<typename> typename TAllocator>
class TaskLoop class TaskLoop
{ {
public: public:
MIJIN_DEFINE_FLAG(CanContinue); MIJIN_DEFINE_FLAG(CanContinue);
MIJIN_DEFINE_FLAG(IgnoreWaiting); MIJIN_DEFINE_FLAG(IgnoreWaiting);
using wrapped_task_t = WrappedTaskBase; using wrapped_task_t = WrappedTaskBase<TAllocator>;
using wrapped_task_base_ptr_t = std::unique_ptr<wrapped_task_t>; using wrapped_task_base_ptr_t = std::unique_ptr<wrapped_task_t, AllocatorDeleter<TAllocator<wrapped_task_t>>>;
struct StoredTask struct StoredTask
{ {
wrapped_task_base_ptr_t task; wrapped_task_base_ptr_t task;
@ -506,31 +582,55 @@ public:
}; };
using exception_handler_t = std::function<void(std::exception_ptr)>; using exception_handler_t = std::function<void(std::exception_ptr)>;
using allocator_t = TAllocator<int>;
protected: protected:
using task_vector_t = std::vector<StoredTask>; using task_vector_t = std::vector<StoredTask, TAllocator<StoredTask>>;
template<typename TTask> template<typename TTask>
using wrapped_task_ptr_t = std::unique_ptr<WrappedTask<TTask>>; using wrapped_task_ptr_t = std::unique_ptr<WrappedTask<TTask>>;
exception_handler_t uncaughtExceptionHandler_; exception_handler_t uncaughtExceptionHandler_;
[[no_unique_address]] allocator_t allocator_;
public: public:
TaskLoop() MIJIN_NOEXCEPT = default; explicit TaskLoop(allocator_t allocator = {}) MIJIN_NOEXCEPT_IF(std::is_nothrow_move_constructible_v<allocator_t>)
: allocator_(std::move(allocator)) {};
TaskLoop(const TaskLoop&) = delete; TaskLoop(const TaskLoop&) = delete;
TaskLoop(TaskLoop&&) = delete; TaskLoop(TaskLoop&&) = delete;
virtual ~TaskLoop() MIJIN_NOEXCEPT = default; virtual ~TaskLoop() MIJIN_NOEXCEPT = default;
[[nodiscard]]
const allocator_t& getAllocator() const MIJIN_NOEXCEPT { return allocator_; }
TaskLoop& operator=(const TaskLoop&) = delete; TaskLoop& operator=(const TaskLoop&) = delete;
TaskLoop& operator=(TaskLoop&&) = delete; TaskLoop& operator=(TaskLoop&&) = delete;
void setUncaughtExceptionHandler(exception_handler_t handler) MIJIN_NOEXCEPT { uncaughtExceptionHandler_ = std::move(handler); } void setUncaughtExceptionHandler(exception_handler_t handler) MIJIN_NOEXCEPT { uncaughtExceptionHandler_ = std::move(handler); }
template<typename TResult> template<typename TResult>
inline FuturePtr<TResult> addTask(TaskBase<TResult> task, TaskHandle* outHandle = nullptr) MIJIN_NOEXCEPT; FuturePtr<TResult> addTaskImpl(TaskBase<TResult, TAllocator> task, TaskHandle* outHandle) MIJIN_NOEXCEPT;
template<typename TResult>
FuturePtr<TResult> addTask(TaskBase<TResult, TAllocator> task, TaskHandle* outHandle = nullptr) MIJIN_NOEXCEPT
{
static_assert(TaskAllocatorTraits<TAllocator>::default_is_valid_v, "Allocator is not valid when default constructed, use makeTask() instead.");
return addTaskImpl(std::move(task), outHandle);
}
template<typename TCoro, typename... TArgs>
auto makeTask(TCoro&& coro, TaskHandle& outHandle, TArgs&&... args) MIJIN_NOEXCEPT;
template<typename TCoro, typename... TArgs>
auto makeTask(TCoro&& coro, TArgs&&... args) MIJIN_NOEXCEPT
{
TaskHandle dummy;
return makeTask(std::forward<TCoro>(coro), dummy, std::forward<TArgs>(args)...);
}
virtual void transferCurrentTask(TaskLoop& otherLoop) MIJIN_NOEXCEPT = 0; virtual void transferCurrentTask(TaskLoop& otherLoop) MIJIN_NOEXCEPT = 0;
virtual void addStoredTask(StoredTask&& storedTask) MIJIN_NOEXCEPT = 0; virtual void addStoredTask(StoredTask&& storedTask) MIJIN_NOEXCEPT = 0;
[[nodiscard]] static TaskLoop& current() MIJIN_NOEXCEPT; [[nodiscard]] static TaskLoop& current() MIJIN_NOEXCEPT;
[[nodiscard]] static TaskLoop* currentOpt() MIJIN_NOEXCEPT;
protected: protected:
inline TaskStatus tickTask(StoredTask& task); inline TaskStatus tickTask(StoredTask& task);
protected: protected:
@ -542,17 +642,28 @@ protected:
template<typename TResult = void> template<typename TResult = void>
using Task = TaskBase<TResult>; using Task = TaskBase<TResult>;
class SimpleTaskLoop : public TaskLoop template<template<typename> typename TAllocator = MIJIN_DEFAULT_ALLOCATOR>
class BaseSimpleTaskLoop : public TaskLoop<TAllocator>
{ {
private: private:
using base_t = TaskLoop<TAllocator>;
using typename TaskLoop<TAllocator>::task_vector_t;
using typename TaskLoop<TAllocator>::allocator_t;
using typename TaskLoop<TAllocator>::StoredTask;
using typename TaskLoop<TAllocator>::CanContinue;
using typename TaskLoop<TAllocator>::IgnoreWaiting;
using base_t::allocator_;
task_vector_t tasks_; task_vector_t tasks_;
task_vector_t newTasks_; task_vector_t newTasks_;
task_vector_t::iterator currentTask_; task_vector_t::iterator currentTask_;
MessageQueue<StoredTask> queuedTasks_; MessageQueue<StoredTask> queuedTasks_;
std::thread::id threadId_; std::thread::id threadId_;
public:
explicit BaseSimpleTaskLoop(const allocator_t& allocator = {}) MIJIN_NOEXCEPT_IF(std::is_nothrow_copy_constructible_v<allocator_t>)
: base_t(std::move(allocator)), tasks_(TAllocator<StoredTask>(allocator_)), newTasks_(TAllocator<StoredTask>(allocator_)) {}
public: // TaskLoop implementation public: // TaskLoop implementation
void transferCurrentTask(TaskLoop& otherLoop) MIJIN_NOEXCEPT override; void transferCurrentTask(TaskLoop<TAllocator>& otherLoop) MIJIN_NOEXCEPT override;
void addStoredTask(StoredTask&& storedTask) MIJIN_NOEXCEPT override; void addStoredTask(StoredTask&& storedTask) MIJIN_NOEXCEPT override;
public: // public interface public: // public interface
@ -562,23 +673,34 @@ public: // public interface
inline CanContinue tick(); inline CanContinue tick();
inline void runUntilDone(IgnoreWaiting ignoreWaiting = IgnoreWaiting::NO); inline void runUntilDone(IgnoreWaiting ignoreWaiting = IgnoreWaiting::NO);
inline void cancelAllTasks() MIJIN_NOEXCEPT; inline void cancelAllTasks() MIJIN_NOEXCEPT;
[[nodiscard]] inline std::vector<TaskHandle> getAllTasks() const MIJIN_NOEXCEPT; [[nodiscard]] inline std::vector<TaskHandle, TAllocator<TaskHandle>> getAllTasks() const MIJIN_NOEXCEPT;
private: private:
inline void assertCorrectThread() { MIJIN_ASSERT(threadId_ == std::thread::id() || threadId_ == std::this_thread::get_id(), "Unsafe to TaskLoop from different thread!"); } inline void assertCorrectThread() { MIJIN_ASSERT(threadId_ == std::thread::id() || threadId_ == std::this_thread::get_id(), "Unsafe to TaskLoop from different thread!"); }
}; };
using SimpleTaskLoop = BaseSimpleTaskLoop<>;
class MultiThreadedTaskLoop : public TaskLoop template<template<typename> typename TAllocator = MIJIN_DEFAULT_ALLOCATOR>
class BaseMultiThreadedTaskLoop : public TaskLoop<TAllocator>
{ {
private: private:
using base_t = TaskLoop<TAllocator>;
using typename base_t::task_vector_t;
using typename base_t::allocator_t;
using typename base_t::StoredTask;
using base_t::allocator_;
task_vector_t parkedTasks_; // buffer for tasks that don't fit into readyTasks_ task_vector_t parkedTasks_; // buffer for tasks that don't fit into readyTasks_
MessageQueue<StoredTask> queuedTasks_; // tasks that should be appended to parked tasks MessageQueue<StoredTask> queuedTasks_; // tasks that should be appended to parked tasks
MessageQueue<StoredTask> readyTasks_; // task queue to send tasks to a worker thread MessageQueue<StoredTask> readyTasks_; // task queue to send tasks to a worker thread
MessageQueue<StoredTask> returningTasks_; // task that have executed on a worker thread and return for further processing MessageQueue<StoredTask> returningTasks_; // task that have executed on a worker thread and return for further processing
std::jthread managerThread_; std::jthread managerThread_;
std::vector<std::jthread> workerThreads_; std::vector<std::jthread, TAllocator<std::jthread>> workerThreads_;
public:
explicit BaseMultiThreadedTaskLoop(allocator_t allocator = {}) MIJIN_NOEXCEPT_IF(std::is_nothrow_copy_constructible_v<allocator_t>)
: base_t(std::move(allocator)), parkedTasks_(TAllocator<StoredTask>(allocator_)), workerThreads_(TAllocator<std::jthread>(allocator_)) {}
public: // TaskLoop implementation public: // TaskLoop implementation
void transferCurrentTask(TaskLoop& otherLoop) MIJIN_NOEXCEPT override; void transferCurrentTask(TaskLoop<TAllocator>& otherLoop) MIJIN_NOEXCEPT override;
void addStoredTask(StoredTask&& storedTask) MIJIN_NOEXCEPT override; void addStoredTask(StoredTask&& storedTask) MIJIN_NOEXCEPT override;
public: // public interface public: // public interface
@ -587,7 +709,10 @@ public: // public interface
private: // private stuff private: // private stuff
void managerThread(std::stop_token stopToken); void managerThread(std::stop_token stopToken);
void workerThread(std::stop_token stopToken, std::size_t workerId); void workerThread(std::stop_token stopToken, std::size_t workerId);
static thread_local StoredTask* currentTask_ = nullptr;
}; };
using MultiThreadedTaskLoop = BaseMultiThreadedTaskLoop<>;
// //
// public functions // public functions
@ -595,12 +720,12 @@ private: // private stuff
namespace impl namespace impl
{ {
extern thread_local TaskLoop::StoredTask* gCurrentTask; extern thread_local std::shared_ptr<TaskSharedState> gCurrentTaskState;
inline void throwIfCancelled() inline void throwIfCancelled()
{ {
#if MIJIN_COROUTINE_ENABLE_CANCEL #if MIJIN_COROUTINE_ENABLE_CANCEL
if (gCurrentTask->task->sharedState()->cancelled_) if (gCurrentTaskState->cancelled_)
{ {
throw TaskCancelled(); throw TaskCancelled();
} }
@ -617,6 +742,15 @@ void TaskHandle::cancel() const MIJIN_NOEXCEPT
} }
} }
Optional<std::source_location> TaskHandle::getLocation() const noexcept
{
if (std::shared_ptr<TaskSharedState> state = state_.lock())
{
return state->sourceLoc;
}
return NULL_OPTIONAL;
}
#if MIJIN_COROUTINE_ENABLE_DEBUG_INFO #if MIJIN_COROUTINE_ENABLE_DEBUG_INFO
Optional<Stacktrace> TaskHandle::getCreationStack() const MIJIN_NOEXCEPT Optional<Stacktrace> TaskHandle::getCreationStack() const MIJIN_NOEXCEPT
{ {
@ -628,8 +762,8 @@ Optional<Stacktrace> TaskHandle::getCreationStack() const MIJIN_NOEXCEPT
} }
#endif // MIJIN_COROUTINE_ENABLE_DEBUG_INFO #endif // MIJIN_COROUTINE_ENABLE_DEBUG_INFO
template<typename TResult> template<typename TResult, template<typename> typename TAllocator>
TaskBase<TResult>::~TaskBase() MIJIN_NOEXCEPT TaskBase<TResult, TAllocator>::~TaskBase() MIJIN_NOEXCEPT
{ {
if (handle_) if (handle_)
{ {
@ -637,13 +771,14 @@ TaskBase<TResult>::~TaskBase() MIJIN_NOEXCEPT
} }
} }
template<template<typename> typename TAllocator>
template<typename TResult> template<typename TResult>
inline FuturePtr<TResult> TaskLoop::addTask(TaskBase<TResult> task, TaskHandle* outHandle) MIJIN_NOEXCEPT FuturePtr<TResult> TaskLoop<TAllocator>::addTaskImpl(TaskBase<TResult, TAllocator> task, TaskHandle* outHandle) MIJIN_NOEXCEPT
{ {
MIJIN_ASSERT(!task.getLoop(), "Attempting to add task that already has a loop!"); MIJIN_ASSERT(!task.getLoop(), "Attempting to add task that already has a loop!");
task.setLoop(this); task.setLoop(this);
auto future = std::make_shared<Future<TResult>>(); auto future = std::allocate_shared<Future<TResult>>(TAllocator<Future<TResult>>(allocator_));
auto setFuture = &setFutureHelper<TResult>; auto setFuture = &setFutureHelper<TResult>;
if (outHandle != nullptr) if (outHandle != nullptr)
@ -653,7 +788,7 @@ inline FuturePtr<TResult> TaskLoop::addTask(TaskBase<TResult> task, TaskHandle*
// add tasks to a seperate vector first as we might be running another task right now // add tasks to a seperate vector first as we might be running another task right now
addStoredTask(StoredTask{ addStoredTask(StoredTask{
.task = wrapTask(std::move(task)), .task = wrapTask(TAllocator<WrappedTask<TaskBase<TResult, TAllocator>>>(allocator_), std::move(task)),
.setFuture = setFuture, .setFuture = setFuture,
.resultData = future .resultData = future
}); });
@ -661,17 +796,29 @@ inline FuturePtr<TResult> TaskLoop::addTask(TaskBase<TResult> task, TaskHandle*
return future; return future;
} }
inline TaskStatus TaskLoop::tickTask(StoredTask& task) template<template<typename> typename TAllocator>
template<typename TCoro, typename... TArgs>
auto TaskLoop<TAllocator>::makeTask(TCoro&& coro, TaskHandle& outHandle, TArgs&&... args) MIJIN_NOEXCEPT
{
TaskLoop<TAllocator>* previousLoop = currentLoopStorage();
currentLoopStorage() = this;
auto result = addTaskImpl(std::invoke(std::forward<TCoro>(coro), std::forward<TArgs>(args)...), &outHandle);
currentLoopStorage() = previousLoop;
return result;
}
template<template<typename> typename TAllocator>
TaskStatus TaskLoop<TAllocator>::tickTask(StoredTask& task)
{ {
TaskStatus status = {}; TaskStatus status = {};
impl::gCurrentTask = &task; impl::gCurrentTaskState = task.task->sharedState();
do do
{ {
task.task->resume(); task.task->resume();
status = task.task ? task.task->status() : TaskStatus::WAITING; // no inner task -> task switch context (and will be removed later) status = task.task ? task.task->status() : TaskStatus::WAITING; // no inner task -> task switch context (and will be removed later)
} }
while (status == TaskStatus::RUNNING); while (status == TaskStatus::RUNNING);
impl::gCurrentTask = nullptr; impl::gCurrentTaskState = nullptr;
#if MIJIN_COROUTINE_ENABLE_EXCEPTION_HANDLING #if MIJIN_COROUTINE_ENABLE_EXCEPTION_HANDLING
if (task.task && task.task->exception()) if (task.task && task.task->exception())
@ -706,22 +853,31 @@ inline TaskStatus TaskLoop::tickTask(StoredTask& task)
return status; return status;
} }
/* static */ inline auto TaskLoop::current() MIJIN_NOEXCEPT -> TaskLoop& template<template<typename> typename TAllocator>
/* static */ inline auto TaskLoop<TAllocator>::current() MIJIN_NOEXCEPT -> TaskLoop&
{ {
MIJIN_ASSERT(currentLoopStorage() != nullptr, "Attempting to fetch current loop while no coroutine is running!"); MIJIN_ASSERT(currentLoopStorage() != nullptr, "Attempting to fetch current loop while no coroutine is running!");
return *currentLoopStorage(); return *currentLoopStorage();
} }
/* static */ auto TaskLoop::currentLoopStorage() MIJIN_NOEXCEPT -> TaskLoop*& template<template<typename> typename TAllocator>
/* static */ inline auto TaskLoop<TAllocator>::currentOpt() MIJIN_NOEXCEPT -> TaskLoop*
{
return currentLoopStorage();
}
template<template<typename> typename TAllocator>
/* static */ auto TaskLoop<TAllocator>::currentLoopStorage() MIJIN_NOEXCEPT -> TaskLoop*&
{ {
static thread_local TaskLoop* storage = nullptr; static thread_local TaskLoop* storage = nullptr;
return storage; return storage;
} }
template<template<typename> typename TAllocator>
template<typename TResult> template<typename TResult>
/* static */ inline void TaskLoop::setFutureHelper(StoredTask& storedTask) MIJIN_NOEXCEPT /* static */ inline void TaskLoop<TAllocator>::setFutureHelper(StoredTask& storedTask) MIJIN_NOEXCEPT
{ {
TaskBase<TResult>& task = *static_cast<TaskBase<TResult>*>(storedTask.task->raw()); TaskBase<TResult, TAllocator>& task = *static_cast<TaskBase<TResult, TAllocator>*>(storedTask.task->raw());
auto future = std::any_cast<FuturePtr<TResult>>(storedTask.resultData); auto future = std::any_cast<FuturePtr<TResult>>(storedTask.resultData);
if constexpr (!std::is_same_v<TResult, void>) if constexpr (!std::is_same_v<TResult, void>)
@ -734,9 +890,10 @@ template<typename TResult>
} }
} }
inline std::suspend_always switchContext(TaskLoop& taskLoop) template<template<typename> typename TAllocator>
inline std::suspend_always switchContext(TaskLoop<TAllocator>& taskLoop)
{ {
TaskLoop& currentTaskLoop = TaskLoop::current(); TaskLoop<TAllocator>& currentTaskLoop = TaskLoop<TAllocator>::current();
if (&currentTaskLoop == &taskLoop) { if (&currentTaskLoop == &taskLoop) {
return {}; return {};
} }
@ -744,11 +901,68 @@ inline std::suspend_always switchContext(TaskLoop& taskLoop)
return {}; return {};
} }
inline auto SimpleTaskLoop::tick() -> CanContinue template<template<typename> typename TAllocator>
void BaseSimpleTaskLoop<TAllocator>::transferCurrentTask(TaskLoop<TAllocator>& otherLoop) MIJIN_NOEXCEPT
{
assertCorrectThread();
if (&otherLoop == this) {
return;
}
MIJIN_ASSERT_FATAL(currentTask_ != tasks_.end(), "Trying to call transferCurrentTask() while not running a task!");
// now start the transfer, first disown the task
StoredTask storedTask = std::move(*currentTask_);
currentTask_->task = nullptr; // just to be sure
// then send it over to the other loop
otherLoop.addStoredTask(std::move(storedTask));
}
template<template<typename> typename TAllocator>
void BaseSimpleTaskLoop<TAllocator>::addStoredTask(StoredTask&& storedTask) MIJIN_NOEXCEPT
{
storedTask.task->setLoop(this);
if (threadId_ == std::thread::id() || threadId_ == std::this_thread::get_id())
{
// same thread, just copy it over
if (TaskLoop<TAllocator>::currentLoopStorage() != nullptr) {
// currently running, can't append to tasks_ directly
newTasks_.push_back(std::move(storedTask));
}
else {
tasks_.push_back(std::move(storedTask));
}
}
else
{
// other thread, better be safe
queuedTasks_.push(std::move(storedTask));
}
}
template<template<typename> typename TAllocator>
std::size_t BaseSimpleTaskLoop<TAllocator>::getActiveTasks() const MIJIN_NOEXCEPT
{
std::size_t sum = 0;
for (const StoredTask& task : mijin::chain(tasks_, newTasks_))
{
const TaskStatus status = task.task ? task.task->status() : TaskStatus::FINISHED;
if (status == TaskStatus::SUSPENDED || status == TaskStatus::RUNNING)
{
++sum;
}
}
return sum;
}
template<template<typename> typename TAllocator>
inline auto BaseSimpleTaskLoop<TAllocator>::tick() -> CanContinue
{ {
// set current taskloop // set current taskloop
MIJIN_ASSERT(currentLoopStorage() == nullptr, "Trying to tick a loop from a coroutine, this is not supported."); MIJIN_ASSERT(TaskLoop<TAllocator>::currentLoopStorage() == nullptr, "Trying to tick a loop from a coroutine, this is not supported.");
currentLoopStorage() = this; TaskLoop<TAllocator>::currentLoopStorage() = this;
threadId_ = std::this_thread::get_id(); threadId_ = std::this_thread::get_id();
// move over all tasks from newTasks // move over all tasks from newTasks
@ -791,7 +1005,7 @@ inline auto SimpleTaskLoop::tick() -> CanContinue
continue; continue;
} }
status = tickTask(task); status = base_t::tickTask(task);
if (status == TaskStatus::SUSPENDED || status == TaskStatus::YIELDED) if (status == TaskStatus::SUSPENDED || status == TaskStatus::YIELDED)
{ {
@ -799,7 +1013,7 @@ inline auto SimpleTaskLoop::tick() -> CanContinue
} }
} }
// reset current loop // reset current loop
currentLoopStorage() = nullptr; TaskLoop<TAllocator>::currentLoopStorage() = nullptr;
// remove any tasks that have been transferred to another queue // remove any tasks that have been transferred to another queue
it = std::remove_if(tasks_.begin(), tasks_.end(), [](const StoredTask& task) { it = std::remove_if(tasks_.begin(), tasks_.end(), [](const StoredTask& task) {
@ -810,7 +1024,8 @@ inline auto SimpleTaskLoop::tick() -> CanContinue
return canContinue; return canContinue;
} }
inline void SimpleTaskLoop::runUntilDone(IgnoreWaiting ignoreWaiting) template<template<typename> typename TAllocator>
void BaseSimpleTaskLoop<TAllocator>::runUntilDone(IgnoreWaiting ignoreWaiting)
{ {
while (!tasks_.empty() || !newTasks_.empty()) while (!tasks_.empty() || !newTasks_.empty())
{ {
@ -822,7 +1037,8 @@ inline void SimpleTaskLoop::runUntilDone(IgnoreWaiting ignoreWaiting)
} }
} }
inline void SimpleTaskLoop::cancelAllTasks() MIJIN_NOEXCEPT template<template<typename> typename TAllocator>
void BaseSimpleTaskLoop<TAllocator>::cancelAllTasks() MIJIN_NOEXCEPT
{ {
for (StoredTask& task : mijin::chain(tasks_, newTasks_)) for (StoredTask& task : mijin::chain(tasks_, newTasks_))
{ {
@ -835,9 +1051,10 @@ inline void SimpleTaskLoop::cancelAllTasks() MIJIN_NOEXCEPT
} }
} }
inline std::vector<TaskHandle> SimpleTaskLoop::getAllTasks() const MIJIN_NOEXCEPT template<template<typename> typename TAllocator>
std::vector<TaskHandle, TAllocator<TaskHandle>> BaseSimpleTaskLoop<TAllocator>::getAllTasks() const MIJIN_NOEXCEPT
{ {
std::vector<TaskHandle> result; std::vector<TaskHandle, TAllocator<TaskHandle>> result((TAllocator<TaskHandle>(TaskLoop<TAllocator>::allocator_)));
for (const StoredTask& task : mijin::chain(tasks_, newTasks_)) for (const StoredTask& task : mijin::chain(tasks_, newTasks_))
{ {
result.emplace_back(task.task->sharedState()); result.emplace_back(task.task->sharedState());
@ -845,6 +1062,151 @@ inline std::vector<TaskHandle> SimpleTaskLoop::getAllTasks() const MIJIN_NOEXCEP
return result; return result;
} }
template<template<typename> typename TAllocator>
void BaseMultiThreadedTaskLoop<TAllocator>::managerThread(std::stop_token stopToken) // NOLINT(performance-unnecessary-value-param)
{
// setCurrentThreadName("Task Manager");
while (!stopToken.stop_requested())
{
// first clear out any parked tasks that are actually finished
auto itRem = std::remove_if(parkedTasks_.begin(), parkedTasks_.end(), [](StoredTask& task) {
return !task.task || task.task->status() == TaskStatus::FINISHED;
});
parkedTasks_.erase(itRem, parkedTasks_.end());
// then try to push any task from the buffer into the queue, if possible
for (auto it = parkedTasks_.begin(); it != parkedTasks_.end();)
{
if (!it->task->canResume())
{
++it;
continue;
}
if (readyTasks_.tryPushMaybeMove(*it)) {
it = parkedTasks_.erase(it);
}
else {
break;
}
}
// then clear the incoming task queue
while (true)
{
std::optional<StoredTask> task = queuedTasks_.tryPop();
if (!task.has_value()) {
break;
}
// try to directly move it into the next queue
if (readyTasks_.tryPushMaybeMove(*task)) {
continue;
}
// otherwise park it
parkedTasks_.push_back(std::move(*task));
}
// next collect tasks returning from the worker threads
while (true)
{
std::optional<StoredTask> task = returningTasks_.tryPop();
if (!task.has_value()) {
break;
}
if (task->task == nullptr || task->task->status() == TaskStatus::FINISHED) {
continue; // task has been transferred or finished
}
if (task->task->canResume() && readyTasks_.tryPushMaybeMove(*task)) {
continue; // instantly resume, no questions asked
}
// otherwise park it for future processing
parkedTasks_.push_back(std::move(*task));
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
template<template<typename> typename TAllocator>
void BaseMultiThreadedTaskLoop<TAllocator>::workerThread(std::stop_token stopToken, std::size_t workerId) // NOLINT(performance-unnecessary-value-param)
{
TaskLoop<TAllocator>::currentLoopStorage() = this; // forever (on this thread)
std::array<char, 16> threadName;
(void) std::snprintf(threadName.data(), 16, "Task Worker %lu", static_cast<unsigned long>(workerId));
// setCurrentThreadName(threadName.data());
while (!stopToken.stop_requested())
{
// try to fetch a task to run
std::optional<StoredTask> task = readyTasks_.tryPop();
if (!task.has_value())
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
continue;
}
// run it
currentTask_ = &*task;
impl::gCurrentTaskState = task->task->sharedState();
tickTask(*task);
currentTask_ = nullptr;
impl::gCurrentTaskState = nullptr;
// and give it back
returningTasks_.push(std::move(*task));
}
}
template<template<typename> typename TAllocator>
void BaseMultiThreadedTaskLoop<TAllocator>::transferCurrentTask(TaskLoop<TAllocator>& otherLoop) MIJIN_NOEXCEPT
{
if (&otherLoop == this) {
return;
}
MIJIN_ASSERT_FATAL(currentTask_ != nullptr, "Trying to call transferCurrentTask() while not running a task!");
// now start the transfer, first disown the task
StoredTask storedTask = std::move(*currentTask_);
currentTask_->task = nullptr; // just to be sure
// then send it over to the other loop
otherLoop.addStoredTask(std::move(storedTask));
}
template<template<typename> typename TAllocator>
void BaseMultiThreadedTaskLoop<TAllocator>::addStoredTask(StoredTask&& storedTask) MIJIN_NOEXCEPT
{
storedTask.task->setLoop(this);
// just assume we are not on the manager thread, as that wouldn't make sense
queuedTasks_.push(std::move(storedTask));
}
template<template<typename> typename TAllocator>
void BaseMultiThreadedTaskLoop<TAllocator>::start(std::size_t numWorkerThreads)
{
managerThread_ = std::jthread([this](std::stop_token stopToken) { managerThread(std::move(stopToken)); });
workerThreads_.reserve(numWorkerThreads);
for (std::size_t workerId = 0; workerId < numWorkerThreads; ++workerId) {
workerThreads_.emplace_back([this, workerId](std::stop_token stopToken) { workerThread(std::move(stopToken), workerId); });
}
}
template<template<typename> typename TAllocator>
void BaseMultiThreadedTaskLoop<TAllocator>::stop()
{
workerThreads_.clear(); // will also set the stop token
managerThread_ = {}; // this too
}
// utility stuff // utility stuff
inline TaskAwaitableSuspend c_suspend() { inline TaskAwaitableSuspend c_suspend() {
@ -871,8 +1233,8 @@ Task<> c_allDone(const TCollection<FuturePtr<TType>, TTemplateArgs...>& futures)
[[nodiscard]] inline TaskHandle getCurrentTask() MIJIN_NOEXCEPT [[nodiscard]] inline TaskHandle getCurrentTask() MIJIN_NOEXCEPT
{ {
MIJIN_ASSERT(impl::gCurrentTask != nullptr, "Attempt to call getCurrentTask() outside of task."); MIJIN_ASSERT(impl::gCurrentTaskState != nullptr, "Attempt to call getCurrentTask() outside of task.");
return TaskHandle(impl::gCurrentTask->task->sharedState()); return TaskHandle(impl::gCurrentTaskState);
} }
} }

View File

@ -33,11 +33,11 @@ inline constexpr signal_token_t INVALID_SIGNAL_TOKEN = std::numeric_limits<signa
MIJIN_DEFINE_FLAG(Oneshot); MIJIN_DEFINE_FLAG(Oneshot);
template<typename... TArgs> template<template<typename> typename TAllocator, typename... TArgs>
class Signal class BaseSignal
{ {
public: public:
using handler_t = std::function<void(TArgs...)>; using handler_t = std::function<void(TArgs...)>; // TODO: write a custom function wrapper with allocator support
using token_t = signal_token_t; using token_t = signal_token_t;
private: private:
struct RegisteredHandler struct RegisteredHandler
@ -47,18 +47,24 @@ private:
token_t token; token_t token;
Oneshot oneshot = Oneshot::NO; Oneshot oneshot = Oneshot::NO;
}; };
using handler_vector_t = std::vector<RegisteredHandler>; using handler_vector_t = std::vector<RegisteredHandler, TAllocator<RegisteredHandler>>;
private: private:
handler_vector_t handlers_; handler_vector_t handlers_;
token_t nextToken = 1; token_t nextToken = 1;
std::mutex handlersMutex_; std::mutex handlersMutex_;
public: public:
Signal() = default; explicit BaseSignal(TAllocator<void> allocator = {}) : handlers_(TAllocator<RegisteredHandler>(std::move(allocator))) {}
Signal(const Signal&) = delete; BaseSignal(const BaseSignal&) = delete;
Signal(Signal&&) MIJIN_NOEXCEPT = default; BaseSignal(BaseSignal&&) MIJIN_NOEXCEPT = default;
void reinit(TAllocator<void> allocator = {})
{
MIJIN_ASSERT(handlers_.empty(), "Attempting to re-initialize a signal that already has handlers.");
handlers_ = handler_vector_t(TAllocator<RegisteredHandler>(std::move(allocator)));
}
public: public:
Signal& operator=(const Signal&) = delete; BaseSignal& operator=(const BaseSignal&) = delete;
Signal& operator=(Signal&&) MIJIN_NOEXCEPT = default; BaseSignal& operator=(BaseSignal&&) MIJIN_NOEXCEPT = default;
public: public:
template<typename THandler, typename TWeak = void> template<typename THandler, typename TWeak = void>
inline token_t connect(THandler handler, Oneshot oneshot = Oneshot::NO, std::weak_ptr<TWeak> referenced = std::weak_ptr<TWeak>()) MIJIN_NOEXCEPT; inline token_t connect(THandler handler, Oneshot oneshot = Oneshot::NO, std::weak_ptr<TWeak> referenced = std::weak_ptr<TWeak>()) MIJIN_NOEXCEPT;
@ -70,13 +76,16 @@ public:
inline void emit(TArgs2&&... args) MIJIN_NOEXCEPT; inline void emit(TArgs2&&... args) MIJIN_NOEXCEPT;
}; };
template<typename... TArgs>
using Signal = BaseSignal<MIJIN_DEFAULT_ALLOCATOR, TArgs...>;
// //
// public functions // public functions
// //
template<typename... TArgs> template<template<typename> typename TAllocator, typename... TArgs>
template<typename THandler, typename TWeak> template<typename THandler, typename TWeak>
inline auto Signal<TArgs...>::connect(THandler handler, Oneshot oneshot, std::weak_ptr<TWeak> referenced) MIJIN_NOEXCEPT -> token_t inline auto BaseSignal<TAllocator, TArgs...>::connect(THandler handler, Oneshot oneshot, std::weak_ptr<TWeak> referenced) MIJIN_NOEXCEPT -> token_t
{ {
std::lock_guard lock(handlersMutex_); std::lock_guard lock(handlersMutex_);
@ -91,9 +100,9 @@ inline auto Signal<TArgs...>::connect(THandler handler, Oneshot oneshot, std::we
return nextToken++; return nextToken++;
} }
template<typename... TArgs> template<template<typename> typename TAllocator, typename... TArgs>
template<typename TObject, typename TWeak> template<typename TObject, typename TWeak>
inline auto Signal<TArgs...>::connect(TObject& object, void (TObject::* handler)(TArgs...), Oneshot oneshot, std::weak_ptr<TWeak> referenced) MIJIN_NOEXCEPT -> token_t inline auto BaseSignal<TAllocator, TArgs...>::connect(TObject& object, void (TObject::* handler)(TArgs...), Oneshot oneshot, std::weak_ptr<TWeak> referenced) MIJIN_NOEXCEPT -> token_t
{ {
std::lock_guard lock(handlersMutex_); std::lock_guard lock(handlersMutex_);
@ -111,8 +120,8 @@ inline auto Signal<TArgs...>::connect(TObject& object, void (TObject::* handler)
return nextToken++; return nextToken++;
} }
template<typename... TArgs> template<template<typename> typename TAllocator, typename... TArgs>
inline void Signal<TArgs...>::disconnect(token_t token) MIJIN_NOEXCEPT inline void BaseSignal<TAllocator, TArgs...>::disconnect(token_t token) MIJIN_NOEXCEPT
{ {
std::lock_guard lock(handlersMutex_); std::lock_guard lock(handlersMutex_);
@ -123,9 +132,9 @@ inline void Signal<TArgs...>::disconnect(token_t token) MIJIN_NOEXCEPT
handlers_.erase(it, handlers_.end()); handlers_.erase(it, handlers_.end());
} }
template<typename... TArgs> template<template<typename> typename TAllocator, typename... TArgs>
template<typename... TArgs2> template<typename... TArgs2>
inline void Signal<TArgs...>::emit(TArgs2&&... args) MIJIN_NOEXCEPT inline void BaseSignal<TAllocator, TArgs...>::emit(TArgs2&&... args) MIJIN_NOEXCEPT
{ {
std::lock_guard lock(handlersMutex_); std::lock_guard lock(handlersMutex_);

View File

@ -45,7 +45,7 @@ namespace mijin
#if MIJIN_DEBUG #if MIJIN_DEBUG
#define MIJIN_RAISE_ERROR(msg, source_loc) \ #define MIJIN_RAISE_ERROR(msg, source_loc) \
switch (mijin::handleError(msg, source_loc)) \ switch (mijin::handleError(msg, source_loc)) \
{ \ { \
case mijin::ErrorHandling::CONTINUE: \ case mijin::ErrorHandling::CONTINUE: \
@ -99,10 +99,10 @@ if (!static_cast<bool>(condition)) \
MIJIN_FATAL("Debug assertion failed: " #condition "\nMessage: " msg); \ MIJIN_FATAL("Debug assertion failed: " #condition "\nMessage: " msg); \
} }
#else // MIJIN_DEBUG #else // MIJIN_DEBUG
#define MIJIN_ERROR(...) #define MIJIN_ERROR(...) ((void)0)
#define MIJIN_FATAL(...) std::abort() #define MIJIN_FATAL(...) std::abort()
#define MIJIN_ASSERT(...) #define MIJIN_ASSERT(...) ((void)0)
#define MIJIN_ASSERT_FATAL(...) #define MIJIN_ASSERT_FATAL(...) ((void)0)
#endif // !MIJIN_DEBUG #endif // !MIJIN_DEBUG
// //

View File

@ -5,6 +5,7 @@
#define MIJIN_DEBUG_STACKTRACE_HPP_INCLUDED 1 #define MIJIN_DEBUG_STACKTRACE_HPP_INCLUDED 1
#include <cmath> #include <cmath>
#include <format>
#include <iomanip> #include <iomanip>
#include <vector> #include <vector>
#if __has_include(<fmt/format.h>) #if __has_include(<fmt/format.h>)
@ -87,6 +88,68 @@ TStream& operator<<(TStream& stream, const Stacktrace& stacktrace)
} // namespace mijin } // namespace mijin
template<typename TChar>
struct std::formatter<mijin::Stackframe, TChar>
{
using char_t = TChar;
template<typename TContext>
constexpr TContext::iterator parse(TContext& ctx)
{
auto it = ctx.begin();
auto end = ctx.end();
if (it != end && *it != MIJIN_SMART_QUOTE(char_t, '}'))
{
throw std::format_error("invalid format");
}
return it;
}
template<typename TContext>
TContext::iterator format(const mijin::Stackframe& stackframe, TContext& ctx) const
{
auto it = ctx.out();
it = std::format_to(it, MIJIN_SMART_QUOTE(char_t, "[{}] {}:{} in {}"), stackframe.address, stackframe.filename,
stackframe.lineNumber, mijin::demangleCPPIdentifier(stackframe.function.c_str()));
return it;
}
};
template<typename TChar>
struct std::formatter<mijin::Stacktrace, TChar>
{
using char_t = TChar;
template<class TContext>
constexpr TContext::iterator parse(TContext& ctx)
{
auto it = ctx.begin();
auto end = ctx.end();
if (it != end && *it != MIJIN_SMART_QUOTE(char_t, '}'))
{
throw std::format_error("invalid format");
}
return it;
}
template<typename TContext>
TContext::iterator format(const mijin::Stacktrace& stacktrace, TContext& ctx) const
{
const int numDigits = static_cast<int>(std::ceil(std::log10(stacktrace.getFrames().size())));
auto it = ctx.out();
it = std::format_to(it, MIJIN_SMART_QUOTE(char_t, "[{} frames]"), stacktrace.getFrames().size());
for (const auto& [idx, frame] : mijin::enumerate(stacktrace.getFrames()))
{
it = std::format_to(it, MIJIN_SMART_QUOTE(char_t, "\n #{:<{}} at {}"), idx, numDigits, frame);
}
return it;
}
};
#if __has_include(<fmt/format.h>) #if __has_include(<fmt/format.h>)
template<> template<>
struct fmt::formatter<mijin::Stackframe> struct fmt::formatter<mijin::Stackframe>

View File

@ -20,7 +20,8 @@ private:
[[no_unique_address]] TAllocator allocator_; [[no_unique_address]] TAllocator allocator_;
public: public:
explicit AllocatorDeleter(TAllocator allocator = {}) MIJIN_NOEXCEPT_IF(std::is_nothrow_move_constructible_v<TAllocator>) AllocatorDeleter() = default;
explicit AllocatorDeleter(TAllocator allocator) MIJIN_NOEXCEPT_IF(std::is_nothrow_move_constructible_v<TAllocator>)
: allocator_(std::move(allocator)) {} : allocator_(std::move(allocator)) {}
template<typename TOtherAllocator> requires (std::is_constructible_v<TAllocator, const TOtherAllocator&>) template<typename TOtherAllocator> requires (std::is_constructible_v<TAllocator, const TOtherAllocator&>)
@ -69,6 +70,10 @@ class AllocatorDeleter<std::allocator<T>>
{ {
public: public:
AllocatorDeleter() noexcept = default; AllocatorDeleter() noexcept = default;
template<typename TOther>
AllocatorDeleter(std::allocator<TOther>) noexcept {}
template<typename TOther> template<typename TOther>
AllocatorDeleter(const AllocatorDeleter<std::allocator<TOther>>&) noexcept {} AllocatorDeleter(const AllocatorDeleter<std::allocator<TOther>>&) noexcept {}

View File

@ -0,0 +1,192 @@
#pragma once
#if !defined(MIJIN_MEMORY_STACK_ALLOCATOR_HPP_INCLUDED)
#define MIJIN_MEMORY_STACK_ALLOCATOR_HPP_INCLUDED 1
#include "../debug/assert.hpp"
#include "../internal/common.hpp"
#include "../util/align.hpp"
#include "../util/traits.hpp"
namespace mijin
{
template<typename TValue, typename TStackAllocator>
struct StlStackAllocator
{
public:
using value_type = TValue;
private:
TStackAllocator* base_;
public:
explicit StlStackAllocator(TStackAllocator& base) MIJIN_NOEXCEPT : base_(&base) {}
template<typename TOtherValue>
StlStackAllocator(const StlStackAllocator<TOtherValue, TStackAllocator>& other) MIJIN_NOEXCEPT : base_(other.base) {}
template<typename TOtherValue>
StlStackAllocator& operator=(const StlStackAllocator<TOtherValue, TStackAllocator>& other) MIJIN_NOEXCEPT
{
base_ = other.base_;
return *this;
}
auto operator<=>(const StlStackAllocator&) const noexcept = default;
[[nodiscard]]
TValue* allocate(std::size_t count) const
{
return static_cast<TValue*>(base_->allocate(alignof(TValue), count * sizeof(TValue)));
}
void deallocate(TValue* /* ptr */, std::size_t /* count */) const MIJIN_NOEXCEPT {}
};
template<std::size_t chunkSize = 4096, template<typename> typename TBacking = MIJIN_DEFAULT_ALLOCATOR> requires (allocator_tmpl<TBacking>)
class StackAllocator
{
public:
using backing_t = TBacking<void>;
static constexpr std::size_t ACTUAL_CHUNK_SIZE = chunkSize - sizeof(void*) - sizeof(std::size_t);
template<typename T>
using stl_allocator_t = StlStackAllocator<T, StackAllocator<chunkSize, TBacking>>;
private:
struct Chunk
{
std::array<std::byte, ACTUAL_CHUNK_SIZE> data;
Chunk* next;
std::size_t allocated;
};
[[no_unique_address]] TBacking<Chunk> backing_;
Chunk* firstChunk_ = nullptr;
public:
StackAllocator() MIJIN_NOEXCEPT_IF(std::is_nothrow_default_constructible_v<backing_t>) = default;
explicit StackAllocator(backing_t backing) MIJIN_NOEXCEPT_IF((std::is_nothrow_constructible_v<TBacking<Chunk>, backing_t&&>))
: backing_(std::move(backing)) {}
StackAllocator(const StackAllocator&) = delete;
StackAllocator(StackAllocator&& other) MIJIN_NOEXCEPT_IF(std::is_nothrow_move_constructible_v<TBacking<Chunk>>)
: firstChunk_(std::exchange(other.firstChunk_, nullptr)) {}
~StackAllocator() noexcept
{
Chunk* chunk = firstChunk_;
while (chunk != nullptr)
{
Chunk* nextChunk = firstChunk_->next;
backing_.deallocate(chunk, 1);
chunk = nextChunk;
}
}
StackAllocator& operator=(const StackAllocator&) = delete;
StackAllocator& operator=(StackAllocator&& other) MIJIN_NOEXCEPT_IF(std::is_nothrow_move_assignable_v<TBacking<Chunk>>)
{
if (this != &other)
{
backing_ = std::move(other.backing_);
firstChunk_ = std::exchange(other.firstChunk_, nullptr);
}
return *this;
}
void* allocate(std::size_t alignment, std::size_t size)
{
// first check if this can ever fit
if (size > ACTUAL_CHUNK_SIZE)
{
return nullptr;
}
// then try to find space in the current chunks
for (Chunk* chunk = firstChunk_; chunk != nullptr; chunk = chunk->next)
{
const std::size_t remaining = ACTUAL_CHUNK_SIZE - chunk->allocated;
if (remaining < size)
{
continue;
}
std::byte* start = &chunk->data[chunk->allocated];
std::byte* pos = mijin::alignUp(start, alignment);
const std::ptrdiff_t alignmentBytes = pos - start;
const std::size_t combinedSize = size + alignmentBytes;
if (remaining < combinedSize)
{
continue;
}
chunk->allocated += combinedSize;
return pos;
}
// no free space in any chunk? allocate a new one
Chunk* newChunk = backing_.allocate(1);
if (newChunk == nullptr)
{
return nullptr;
}
initAndAddChunk(newChunk);
// now try with the new chunk
std::byte* start = newChunk->data.data();
std::byte* pos = mijin::alignUp(start, alignment);
const std::ptrdiff_t alignmentBytes = pos - start;
const std::size_t combinedSize = size + alignmentBytes;
// doesn't fit (due to alignment), time to give up
if (ACTUAL_CHUNK_SIZE < combinedSize)
{
return nullptr;
}
newChunk->allocated = combinedSize;
return pos;
}
void reset() noexcept
{
for (Chunk* chunk = firstChunk_; chunk != nullptr; chunk = chunk->next)
{
chunk->allocated = 0;
}
}
bool createChunks(std::size_t count)
{
if (count == 0)
{
return true;
}
Chunk* newChunks = backing_.allocate(count);
if (newChunks == nullptr)
{
return false;
}
// reverse so the chunks are chained from 0 to count (new chunks are inserted in front)
for (std::size_t pos = count; pos > 0; --pos)
{
initAndAddChunk(&newChunks[pos-1]);
}
return true;
}
template<typename T>
stl_allocator_t<T> makeStlAllocator() MIJIN_NOEXCEPT
{
return stl_allocator_t<T>(*this);
}
private:
void initAndAddChunk(Chunk* newChunk) noexcept
{
::new (newChunk) Chunk();
// put it in the front
newChunk->next = firstChunk_;
firstChunk_ = newChunk;
}
};
} // namespace mijin
#endif // !defined(MIJIN_MEMORY_STACK_ALLOCATOR_HPP_INCLUDED)

View File

@ -18,6 +18,12 @@ constexpr T alignUp(T value, T alignTo) MIJIN_NOEXCEPT
return value; return value;
} }
template<typename T>
T* alignUp(T* pointer, std::uintptr_t alignTo) MIJIN_NOEXCEPT
{
return std::bit_cast<T*>(alignUp(std::bit_cast<std::uintptr_t>(pointer), alignTo));
}
#define MIJIN_STRIDEOF(T) mijin::alignUp(sizeof(T), alignof(T)) #define MIJIN_STRIDEOF(T) mijin::alignUp(sizeof(T), alignof(T))
} // namespace mijin } // namespace mijin

View File

@ -29,7 +29,10 @@
namespace mijin namespace mijin
{ {
template<typename T> requires(!std::is_same_v<T, std::nullptr_t>) && requires(T t) { t == nullptr; } template<typename T>
concept nullable_type = !std::is_same_v<T, std::nullptr_t> && requires(T t) { t == nullptr; };
template<nullable_type T>
class NotNullable class NotNullable
{ {
private: private:
@ -41,6 +44,18 @@ public:
{ {
MIJIN_ASSERT(base_ != nullptr, "Constructed non-nullable type with nullptr."); MIJIN_ASSERT(base_ != nullptr, "Constructed non-nullable type with nullptr.");
} }
template<typename TOther> requires(std::is_constructible_v<T, const TOther&>)
constexpr NotNullable(const NotNullable<TOther>& other) MIJIN_NOEXCEPT_IF((std::is_nothrow_constructible_v<T, const TOther&>))
: base_(other.base_)
{
MIJIN_ASSERT(base_ != nullptr, "Constructed non-nullable type with nullptr.");
}
template<typename TOther> requires(std::is_constructible_v<T, TOther&&>)
constexpr NotNullable(NotNullable<TOther>&& other) MIJIN_NOEXCEPT_IF((std::is_nothrow_constructible_v<T, TOther&&>))
: base_(std::exchange(other.base_, nullptr))
{
MIJIN_ASSERT(base_ != nullptr, "Constructed non-nullable type with nullptr.");
}
template<typename TArg, typename... TArgs> requires(!std::is_same_v<TArg, std::nullptr_t> template<typename TArg, typename... TArgs> requires(!std::is_same_v<TArg, std::nullptr_t>
&& (!std::is_same_v<TArg, T> && sizeof...(TArgs) == 0) && (!std::is_same_v<TArg, T> && sizeof...(TArgs) == 0)
&& std::is_constructible_v<T, TArg&&, TArgs&&...>) && std::is_constructible_v<T, TArg&&, TArgs&&...>)
@ -129,6 +144,9 @@ public:
[[nodiscard]] [[nodiscard]]
constexpr const T& get() const MIJIN_NOEXCEPT { return base_; } constexpr const T& get() const MIJIN_NOEXCEPT { return base_; }
template<nullable_type TOther>
friend class NotNullable;
}; };
#if MIJIN_USE_GSL #if MIJIN_USE_GSL

View File

@ -165,6 +165,23 @@ struct detect_or<TDefault, TOper, TArgs...>
template<typename TDefault, template<typename...> typename TOper, typename... TArgs> template<typename TDefault, template<typename...> typename TOper, typename... TArgs>
using detect_or_t = detect_or<TDefault, TOper, TArgs...>::type; using detect_or_t = detect_or<TDefault, TOper, TArgs...>::type;
struct empty_type {};
template<typename T, bool enable>
struct optional_base
{
using type = T;
};
template<typename T>
struct optional_base<T, false>
{
using type = empty_type;
};
template<typename T, bool enable>
using optional_base_t = optional_base<T, enable>::type;
// //
// public functions // public functions
// //