diff --git a/source/mijin/async/coroutine.hpp b/source/mijin/async/coroutine.hpp index 5527b48..b81136b 100644 --- a/source/mijin/async/coroutine.hpp +++ b/source/mijin/async/coroutine.hpp @@ -725,7 +725,12 @@ private: std::vector> workerThreads_; public: explicit BaseMultiThreadedTaskLoop(allocator_t allocator = {}) MIJIN_NOEXCEPT_IF(std::is_nothrow_copy_constructible_v) - : base_t(std::move(allocator)), parkedTasks_(TAllocator(allocator_)), workerThreads_(TAllocator(allocator_)) {} + : base_t(std::move(allocator)), + parkedTasks_(TAllocator(allocator_)), + queuedTasks_(constructArray::BUFFER_SIZE>(allocator_)), + readyTasks_(constructArray::BUFFER_SIZE>(allocator_)), + returningTasks_(constructArray::BUFFER_SIZE>(allocator_)), + workerThreads_(TAllocator(allocator_)) {} public: // TaskLoop implementation void transferCurrentTask(TaskLoop& otherLoop) MIJIN_NOEXCEPT override; @@ -738,7 +743,11 @@ private: // private stuff void managerThread(std::stop_token stopToken); void workerThread(std::stop_token stopToken, std::size_t workerId); - static thread_local StoredTask* currentTask_ = nullptr; + static StoredTask*& getCurrentTask() + { + static thread_local StoredTask* task = nullptr; + return task; + } }; using MultiThreadedTaskLoop = BaseMultiThreadedTaskLoop<>; @@ -1178,10 +1187,10 @@ void BaseMultiThreadedTaskLoop::workerThread(std::stop_token stopTok } // run it - currentTask_ = &*task; + getCurrentTask() = &*task; impl::gCurrentTaskState = task->task->sharedState(); tickTask(*task); - currentTask_ = nullptr; + getCurrentTask() = nullptr; impl::gCurrentTaskState = nullptr; // and give it back @@ -1196,11 +1205,11 @@ void BaseMultiThreadedTaskLoop::transferCurrentTask(TaskLooptask = nullptr; // just to be sure + StoredTask storedTask = std::move(*getCurrentTask()); + getCurrentTask()->task = nullptr; // just to be sure // then send it over to the other loop otherLoop.addStoredTask(std::move(storedTask)); @@ -1256,6 +1265,36 @@ Task<> c_allDone(const TCollection, TTemplateArgs...>& futures) } while (!allDone); } + +template typename TAllocator, typename... TResult> +struct AllDoneHelper +{ + TaskLoop& currentTaskLoop; + + template + auto makeFuture(TaskBase&& task, std::array& outHandles) + { + return currentTaskLoop.addTaskImpl(std::move(task), &outHandles[index]); + } + + template + auto makeFutures(TaskBase&&... tasks, std::array& outHandles, std::index_sequence) + { + return std::make_tuple(makeFuture(std::move(tasks), outHandles)...); + } +}; + +template typename TAllocator, typename... TResult> +TaskBase, TAllocator> c_allDone(TaskBase&&... tasks) +{ + TaskLoop& currentTaskLoop = TaskLoop::current(); + std::tuple futures = std::make_tuple(currentTaskLoop.addTaskImpl(std::move(tasks), nullptr)...); + while (!allReady(futures)) { + co_await c_suspend(); + } + co_return getAll(futures); +} + [[nodiscard]] inline TaskHandle getCurrentTask() MIJIN_NOEXCEPT { MIJIN_ASSERT(impl::gCurrentTaskState != nullptr, "Attempt to call getCurrentTask() outside of task."); diff --git a/source/mijin/async/future.hpp b/source/mijin/async/future.hpp index c554873..ac156f9 100644 --- a/source/mijin/async/future.hpp +++ b/source/mijin/async/future.hpp @@ -4,8 +4,9 @@ #if !defined(MIJIN_ASYNC_FUTURE_HPP_INCLUDED) #define MIJIN_ASYNC_FUTURE_HPP_INCLUDED 1 -#include #include +#include +#include #include #include "./signal.hpp" #include "../container/optional.hpp" @@ -130,13 +131,49 @@ public: // signals BaseSignal sigSet; }; -template typename TAllocator = MIJIN_DEFAULT_ALLOCATOR> +template typename TAllocator = MIJIN_DEFAULT_ALLOCATOR> using FuturePtr = std::shared_ptr>; // // public functions // +namespace impl +{ +template +struct MultiFutureHelper +{ + template + static bool allReady(const std::tuple...>& futures, std::index_sequence) MIJIN_NOEXCEPT + { + return (std::get(futures)->ready() && ...); + } + + template + static std::tuple...> getAll(const std::tuple...>& futures, std::index_sequence) MIJIN_NOEXCEPT + { + return std::make_tuple(std::move(std::get(futures)->get())...); + } +}; +} + +template typename TAllocator = MIJIN_DEFAULT_ALLOCATOR> +constexpr FuturePtr makeSharedFuture(TAllocator> allocator = {}) MIJIN_NOEXCEPT +{ + return std::allocate_shared>(std::move(allocator)); +} + +template +constexpr bool allReady(const std::tuple...>& futures) MIJIN_NOEXCEPT +{ + return impl::MultiFutureHelper::allReady(futures, std::index_sequence_for()); +} + +template +constexpr std::tuple...> getAll(const std::tuple...>& futures) MIJIN_NOEXCEPT +{ + return impl::MultiFutureHelper::getAll(futures, std::index_sequence_for()); +} } // namespace mijin #endif // !defined(MIJIN_ASYNC_FUTURE_HPP_INCLUDED)