Added allReady() and getAll() functions for futures and c_allDone() with tasks for tasks.

This commit is contained in:
Patrick 2025-07-01 18:19:38 +02:00
parent 973b62a348
commit 1d32530be7
2 changed files with 85 additions and 9 deletions

View File

@ -725,7 +725,12 @@ private:
std::vector<std::jthread, TAllocator<std::jthread>> workerThreads_; std::vector<std::jthread, TAllocator<std::jthread>> workerThreads_;
public: public:
explicit BaseMultiThreadedTaskLoop(allocator_t allocator = {}) MIJIN_NOEXCEPT_IF(std::is_nothrow_copy_constructible_v<allocator_t>) explicit BaseMultiThreadedTaskLoop(allocator_t allocator = {}) MIJIN_NOEXCEPT_IF(std::is_nothrow_copy_constructible_v<allocator_t>)
: base_t(std::move(allocator)), parkedTasks_(TAllocator<StoredTask>(allocator_)), workerThreads_(TAllocator<std::jthread>(allocator_)) {} : base_t(std::move(allocator)),
parkedTasks_(TAllocator<StoredTask>(allocator_)),
queuedTasks_(constructArray<StoredTask, MessageQueue<StoredTask>::BUFFER_SIZE>(allocator_)),
readyTasks_(constructArray<StoredTask, MessageQueue<StoredTask>::BUFFER_SIZE>(allocator_)),
returningTasks_(constructArray<StoredTask, MessageQueue<StoredTask>::BUFFER_SIZE>(allocator_)),
workerThreads_(TAllocator<std::jthread>(allocator_)) {}
public: // TaskLoop implementation public: // TaskLoop implementation
void transferCurrentTask(TaskLoop<TAllocator>& otherLoop) MIJIN_NOEXCEPT override; void transferCurrentTask(TaskLoop<TAllocator>& otherLoop) MIJIN_NOEXCEPT override;
@ -738,7 +743,11 @@ private: // private stuff
void managerThread(std::stop_token stopToken); void managerThread(std::stop_token stopToken);
void workerThread(std::stop_token stopToken, std::size_t workerId); void workerThread(std::stop_token stopToken, std::size_t workerId);
static thread_local StoredTask* currentTask_ = nullptr; static StoredTask*& getCurrentTask()
{
static thread_local StoredTask* task = nullptr;
return task;
}
}; };
using MultiThreadedTaskLoop = BaseMultiThreadedTaskLoop<>; using MultiThreadedTaskLoop = BaseMultiThreadedTaskLoop<>;
@ -1178,10 +1187,10 @@ void BaseMultiThreadedTaskLoop<TAllocator>::workerThread(std::stop_token stopTok
} }
// run it // run it
currentTask_ = &*task; getCurrentTask() = &*task;
impl::gCurrentTaskState = task->task->sharedState(); impl::gCurrentTaskState = task->task->sharedState();
tickTask(*task); tickTask(*task);
currentTask_ = nullptr; getCurrentTask() = nullptr;
impl::gCurrentTaskState = nullptr; impl::gCurrentTaskState = nullptr;
// and give it back // and give it back
@ -1196,11 +1205,11 @@ void BaseMultiThreadedTaskLoop<TAllocator>::transferCurrentTask(TaskLoop<TAlloca
return; return;
} }
MIJIN_ASSERT_FATAL(currentTask_ != nullptr, "Trying to call transferCurrentTask() while not running a task!"); MIJIN_ASSERT_FATAL(getCurrentTask() != nullptr, "Trying to call transferCurrentTask() while not running a task!");
// now start the transfer, first disown the task // now start the transfer, first disown the task
StoredTask storedTask = std::move(*currentTask_); StoredTask storedTask = std::move(*getCurrentTask());
currentTask_->task = nullptr; // just to be sure getCurrentTask()->task = nullptr; // just to be sure
// then send it over to the other loop // then send it over to the other loop
otherLoop.addStoredTask(std::move(storedTask)); otherLoop.addStoredTask(std::move(storedTask));
@ -1256,6 +1265,36 @@ Task<> c_allDone(const TCollection<FuturePtr<TType>, TTemplateArgs...>& futures)
} while (!allDone); } while (!allDone);
} }
template<template<typename> typename TAllocator, typename... TResult>
struct AllDoneHelper
{
TaskLoop<TAllocator>& currentTaskLoop;
template<typename T, std::size_t index>
auto makeFuture(TaskBase<T, TAllocator>&& task, std::array<TaskHandle, sizeof...(TResult)>& outHandles)
{
return currentTaskLoop.addTaskImpl(std::move(task), &outHandles[index]);
}
template<std::size_t... indices>
auto makeFutures(TaskBase<TResult, TAllocator>&&... tasks, std::array<TaskHandle, sizeof...(TResult)>& outHandles, std::index_sequence<indices...>)
{
return std::make_tuple(makeFuture<TResult, indices>(std::move(tasks), outHandles)...);
}
};
template<template<typename> typename TAllocator, typename... TResult>
TaskBase<std::tuple<TResult...>, TAllocator> c_allDone(TaskBase<TResult, TAllocator>&&... tasks)
{
TaskLoop<TAllocator>& currentTaskLoop = TaskLoop<TAllocator>::current();
std::tuple futures = std::make_tuple(currentTaskLoop.addTaskImpl(std::move(tasks), nullptr)...);
while (!allReady(futures)) {
co_await c_suspend();
}
co_return getAll(futures);
}
[[nodiscard]] inline TaskHandle getCurrentTask() MIJIN_NOEXCEPT [[nodiscard]] inline TaskHandle getCurrentTask() MIJIN_NOEXCEPT
{ {
MIJIN_ASSERT(impl::gCurrentTaskState != nullptr, "Attempt to call getCurrentTask() outside of task."); MIJIN_ASSERT(impl::gCurrentTaskState != nullptr, "Attempt to call getCurrentTask() outside of task.");

View File

@ -4,8 +4,9 @@
#if !defined(MIJIN_ASYNC_FUTURE_HPP_INCLUDED) #if !defined(MIJIN_ASYNC_FUTURE_HPP_INCLUDED)
#define MIJIN_ASYNC_FUTURE_HPP_INCLUDED 1 #define MIJIN_ASYNC_FUTURE_HPP_INCLUDED 1
#include <optional>
#include <memory> #include <memory>
#include <optional>
#include <tuple>
#include <type_traits> #include <type_traits>
#include "./signal.hpp" #include "./signal.hpp"
#include "../container/optional.hpp" #include "../container/optional.hpp"
@ -130,13 +131,49 @@ public: // signals
BaseSignal<TAllocator> sigSet; BaseSignal<TAllocator> sigSet;
}; };
template<typename TValue, template<typename> typename TAllocator = MIJIN_DEFAULT_ALLOCATOR> template<typename TValue = void, template<typename> typename TAllocator = MIJIN_DEFAULT_ALLOCATOR>
using FuturePtr = std::shared_ptr<Future<TValue, TAllocator>>; using FuturePtr = std::shared_ptr<Future<TValue, TAllocator>>;
// //
// public functions // public functions
// //
namespace impl
{
template<typename... TResult>
struct MultiFutureHelper
{
template<std::size_t... indices>
static bool allReady(const std::tuple<FuturePtr<TResult>...>& futures, std::index_sequence<indices...>) MIJIN_NOEXCEPT
{
return (std::get<indices>(futures)->ready() && ...);
}
template<std::size_t... indices>
static std::tuple<std::remove_reference_t<TResult>...> getAll(const std::tuple<FuturePtr<TResult>...>& futures, std::index_sequence<indices...>) MIJIN_NOEXCEPT
{
return std::make_tuple(std::move(std::get<indices>(futures)->get())...);
}
};
}
template<typename T, template<typename> typename TAllocator = MIJIN_DEFAULT_ALLOCATOR>
constexpr FuturePtr<T> makeSharedFuture(TAllocator<Future<T>> allocator = {}) MIJIN_NOEXCEPT
{
return std::allocate_shared<Future<T>>(std::move(allocator));
}
template<typename... TResult>
constexpr bool allReady(const std::tuple<FuturePtr<TResult>...>& futures) MIJIN_NOEXCEPT
{
return impl::MultiFutureHelper<TResult...>::allReady(futures, std::index_sequence_for<TResult...>());
}
template<typename... TResult>
constexpr std::tuple<std::remove_reference_t<TResult>...> getAll(const std::tuple<FuturePtr<TResult>...>& futures) MIJIN_NOEXCEPT
{
return impl::MultiFutureHelper<TResult...>::getAll(futures, std::index_sequence_for<TResult...>());
}
} // namespace mijin } // namespace mijin
#endif // !defined(MIJIN_ASYNC_FUTURE_HPP_INCLUDED) #endif // !defined(MIJIN_ASYNC_FUTURE_HPP_INCLUDED)