mijin2/source/mijin/async/message_queue.hpp
2023-05-29 14:51:44 +02:00

133 lines
3.4 KiB
C++

#pragma once
#if !defined(MIJIN_MESSAGE_QUEUE_HPP_INCLUDED)
#define MIJIN_MESSAGE_QUEUE_HPP_INCLUDED 1
#include <array>
#include <atomic>
#include <optional>
#include <thread>
#include "../util/bitarray.hpp"
namespace mijin
{
//
// public defines
//
//
// public constants
//
//
// public types
//
template<typename TMessage, std::size_t bufferSize = 32>
class MessageQueue
{
private:
std::array<TMessage, bufferSize> messages;
mijin::BitArray<bufferSize, true> messageReady;
std::atomic_uint writePos = 0;
std::atomic_uint readPos = 0;
public:
MessageQueue() = default;
MessageQueue(const MessageQueue&) = delete;
MessageQueue(MessageQueue&&) noexcept = delete;
MessageQueue& operator=(const MessageQueue&) = delete;
MessageQueue& operator=(MessageQueue&&) noexcept = delete;
[[nodiscard]] bool tryPushMaybeMove(TMessage& message);
[[nodiscard]] bool tryPush(TMessage message) {
return tryPushMaybeMove(message);
}
void push(TMessage message);
[[nodiscard]] std::optional<TMessage> tryPop();
[[nodiscard]] TMessage wait();
};
template<typename TRequest, typename TResponse, std::size_t requestBufferSize = 32, std::size_t responseBufferSize = 32>
struct TaskMessageQueue
{
MessageQueue<TRequest, requestBufferSize> requests;
MessageQueue<TResponse, responseBufferSize> responses;
};
//
// public functions
//
template<typename TMessage, std::size_t bufferSize>
bool MessageQueue<TMessage, bufferSize>::tryPushMaybeMove(TMessage& message)
{
unsigned oldWritePos = writePos.load(std::memory_order_relaxed);
unsigned newWritePos = 0;
do
{
newWritePos = (oldWritePos + 1) % bufferSize;
if (newWritePos == readPos) {
return false;
}
} while (!writePos.compare_exchange_weak(oldWritePos, newWritePos, std::memory_order_release, std::memory_order_relaxed));
while (messageReady.get(oldWritePos)) {
std::this_thread::yield(); // someone is still reading, wait...
}
messages[oldWritePos] = std::move(message);
messageReady.set(oldWritePos, true);
return true;
}
template<typename TMessage, std::size_t bufferSize>
void MessageQueue<TMessage, bufferSize>::push(TMessage message)
{
while (!tryPushMaybeMove(message)) {
std::this_thread::yield();
}
}
template<typename TMessage, std::size_t bufferSize>
std::optional<TMessage> MessageQueue<TMessage, bufferSize>::tryPop()
{
unsigned oldReadPos = readPos.load(std::memory_order_relaxed);
unsigned newReadPos = 0;
do
{
if (oldReadPos == writePos) {
return std::nullopt;
}
newReadPos = (oldReadPos + 1) % bufferSize;
} while (!readPos.compare_exchange_weak(oldReadPos, newReadPos, std::memory_order_release, std::memory_order_relaxed));
while (!messageReady.get(oldReadPos)) {
std::this_thread::yield(); // no harm in busy-waiting here, should be fast
};
TMessage message = std::move(messages[oldReadPos]);
messageReady.set(oldReadPos, false);
return message;
}
template<typename TMessage, std::size_t bufferSize>
TMessage MessageQueue<TMessage, bufferSize>::wait()
{
while (true)
{
std::optional<TMessage> message = tryPop();
if (message.has_value()) {
return message.value();
}
std::this_thread::yield();
}
}
} // namespace mijin
#endif // !defined(MIJIN_MESSAGE_QUEUE_HPP_INCLUDED)