mijin2/source/mijin/async/message_queue.hpp

192 lines
5.3 KiB
C++

#pragma once
#if !defined(MIJIN_MESSAGE_QUEUE_HPP_INCLUDED)
#define MIJIN_MESSAGE_QUEUE_HPP_INCLUDED 1
#include <array>
#include <atomic>
#include <optional>
#include <thread>
#include "../internal/common.hpp"
#include "../util/bitarray.hpp"
namespace mijin
{
//
// public defines
//
//
// public constants
//
//
// public types
//
template<typename TMessageQueue>
class MessageQueueIterator
{
public:
using value_type = typename TMessageQueue::message_t;
using reference = value_type&;
using pointer = value_type*;
private:
TMessageQueue* queue_ = nullptr;
std::optional<value_type> message_;
public:
MessageQueueIterator() = default;
explicit MessageQueueIterator(TMessageQueue& queue) MIJIN_NOEXCEPT : queue_(&queue) {
message_ = queue_->tryPop();
}
MessageQueueIterator(const MessageQueueIterator&) = delete;
MessageQueueIterator(MessageQueueIterator&&) = default;
MessageQueueIterator& operator=(const MessageQueueIterator&) = delete;
MessageQueueIterator& operator=(MessageQueueIterator&&) = default;
bool operator==(const MessageQueueIterator& other) MIJIN_NOEXCEPT
{
return message_.has_value() == other.message_.has_value();
}
bool operator!=(const MessageQueueIterator& other) MIJIN_NOEXCEPT
{
return !(*this == other);
}
reference operator*() MIJIN_NOEXCEPT
{
return message_.value();
}
pointer operator->() MIJIN_NOEXCEPT
{
return &message_.value();
}
MessageQueueIterator& operator++() MIJIN_NOEXCEPT
{
message_ = queue_->tryPop();
return *this;
}
};
template<typename TMessage, std::size_t bufferSize = 32>
class MessageQueue
{
public:
using message_t = TMessage;
using iterator_t = MessageQueueIterator<MessageQueue<TMessage, bufferSize>>;
static constexpr std::size_t BUFFER_SIZE = bufferSize;
private:
std::array<TMessage, bufferSize> messages_;
mijin::BitArray<bufferSize, true> messageReady_;
std::atomic_uint writePos_ = 0;
std::atomic_uint readPos_ = 0;
public:
MessageQueue() = default;
MessageQueue(const MessageQueue&) = delete;
MessageQueue(MessageQueue&&) = delete;
explicit MessageQueue(const std::array<TMessage, bufferSize>& messages) MIJIN_NOEXCEPT_IF(std::is_nothrow_copy_constructible_v<TMessage>)
: messages_(messages) {}
explicit MessageQueue(std::array<TMessage, bufferSize>&& messages) MIJIN_NOEXCEPT_IF(std::is_nothrow_move_constructible_v<TMessage>)
: messages_(std::move(messages)) {}
MessageQueue& operator=(const MessageQueue&) = delete;
MessageQueue& operator=(MessageQueue&&) = delete;
[[nodiscard]] bool tryPushMaybeMove(TMessage& message);
[[nodiscard]] bool tryPush(TMessage message) {
return tryPushMaybeMove(message);
}
void push(TMessage message);
[[nodiscard]] std::optional<TMessage> tryPop();
[[nodiscard]] TMessage wait();
iterator_t begin() MIJIN_NOEXCEPT { return iterator_t(*this); }
iterator_t end() MIJIN_NOEXCEPT { return iterator_t(); }
};
template<typename TRequest, typename TResponse, std::size_t requestBufferSize = 32, std::size_t responseBufferSize = 32>
struct TaskMessageQueue
{
MessageQueue<TRequest, requestBufferSize> requests;
MessageQueue<TResponse, responseBufferSize> responses;
};
//
// public functions
//
template<typename TMessage, std::size_t bufferSize>
bool MessageQueue<TMessage, bufferSize>::tryPushMaybeMove(TMessage& message)
{
unsigned oldWritePos = writePos_.load(std::memory_order_relaxed);
unsigned newWritePos = 0;
do
{
newWritePos = (oldWritePos + 1) % bufferSize;
if (newWritePos == readPos_) {
return false;
}
} while (!writePos_.compare_exchange_weak(oldWritePos, newWritePos, std::memory_order_release, std::memory_order_relaxed));
while (messageReady_.get(oldWritePos)) {
std::this_thread::yield(); // someone is still reading, wait...
}
messages_[oldWritePos] = std::move(message);
messageReady_.set(oldWritePos, true);
return true;
}
template<typename TMessage, std::size_t bufferSize>
void MessageQueue<TMessage, bufferSize>::push(TMessage message)
{
while (!tryPushMaybeMove(message)) {
std::this_thread::yield();
}
}
template<typename TMessage, std::size_t bufferSize>
std::optional<TMessage> MessageQueue<TMessage, bufferSize>::tryPop()
{
unsigned oldReadPos = readPos_.load(std::memory_order_relaxed);
unsigned newReadPos = 0;
do
{
if (oldReadPos == writePos_) {
return std::nullopt;
}
newReadPos = (oldReadPos + 1) % bufferSize;
} while (!readPos_.compare_exchange_weak(oldReadPos, newReadPos, std::memory_order_release, std::memory_order_relaxed));
while (!messageReady_.get(oldReadPos)) {
std::this_thread::yield(); // no harm in busy-waiting here, should be fast
};
TMessage message = std::move(messages_[oldReadPos]);
messageReady_.set(oldReadPos, false);
return message;
}
template<typename TMessage, std::size_t bufferSize>
TMessage MessageQueue<TMessage, bufferSize>::wait()
{
while (true)
{
std::optional<TMessage> message = tryPop();
if (message.has_value()) {
return message.value();
}
std::this_thread::yield();
}
}
} // namespace mijin
#endif // !defined(MIJIN_MESSAGE_QUEUE_HPP_INCLUDED)