192 lines
5.3 KiB
C++
192 lines
5.3 KiB
C++
|
|
#pragma once
|
|
|
|
#if !defined(MIJIN_MESSAGE_QUEUE_HPP_INCLUDED)
|
|
#define MIJIN_MESSAGE_QUEUE_HPP_INCLUDED 1
|
|
|
|
#include <array>
|
|
#include <atomic>
|
|
#include <optional>
|
|
#include <thread>
|
|
#include "../internal/common.hpp"
|
|
#include "../util/bitarray.hpp"
|
|
|
|
namespace mijin
|
|
{
|
|
|
|
//
|
|
// public defines
|
|
//
|
|
|
|
//
|
|
// public constants
|
|
//
|
|
|
|
//
|
|
// public types
|
|
//
|
|
|
|
template<typename TMessageQueue>
|
|
class MessageQueueIterator
|
|
{
|
|
public:
|
|
using value_type = typename TMessageQueue::message_t;
|
|
using reference = value_type&;
|
|
using pointer = value_type*;
|
|
private:
|
|
TMessageQueue* queue_ = nullptr;
|
|
std::optional<value_type> message_;
|
|
public:
|
|
MessageQueueIterator() = default;
|
|
explicit MessageQueueIterator(TMessageQueue& queue) MIJIN_NOEXCEPT : queue_(&queue) {
|
|
message_ = queue_->tryPop();
|
|
}
|
|
MessageQueueIterator(const MessageQueueIterator&) = delete;
|
|
MessageQueueIterator(MessageQueueIterator&&) = default;
|
|
|
|
MessageQueueIterator& operator=(const MessageQueueIterator&) = delete;
|
|
MessageQueueIterator& operator=(MessageQueueIterator&&) = default;
|
|
|
|
bool operator==(const MessageQueueIterator& other) MIJIN_NOEXCEPT
|
|
{
|
|
return message_.has_value() == other.message_.has_value();
|
|
}
|
|
bool operator!=(const MessageQueueIterator& other) MIJIN_NOEXCEPT
|
|
{
|
|
return !(*this == other);
|
|
}
|
|
|
|
reference operator*() MIJIN_NOEXCEPT
|
|
{
|
|
return message_.value();
|
|
}
|
|
|
|
pointer operator->() MIJIN_NOEXCEPT
|
|
{
|
|
return &message_.value();
|
|
}
|
|
|
|
MessageQueueIterator& operator++() MIJIN_NOEXCEPT
|
|
{
|
|
message_ = queue_->tryPop();
|
|
return *this;
|
|
}
|
|
};
|
|
|
|
template<typename TMessage, std::size_t bufferSize = 32>
|
|
class MessageQueue
|
|
{
|
|
public:
|
|
using message_t = TMessage;
|
|
using iterator_t = MessageQueueIterator<MessageQueue<TMessage, bufferSize>>;
|
|
static constexpr std::size_t BUFFER_SIZE = bufferSize;
|
|
private:
|
|
std::array<TMessage, bufferSize> messages_;
|
|
mijin::BitArray<bufferSize, true> messageReady_;
|
|
std::atomic_uint writePos_ = 0;
|
|
std::atomic_uint readPos_ = 0;
|
|
public:
|
|
MessageQueue() = default;
|
|
MessageQueue(const MessageQueue&) = delete;
|
|
MessageQueue(MessageQueue&&) = delete;
|
|
explicit MessageQueue(const std::array<TMessage, bufferSize>& messages) MIJIN_NOEXCEPT_IF(std::is_nothrow_copy_constructible_v<TMessage>)
|
|
: messages_(messages) {}
|
|
explicit MessageQueue(std::array<TMessage, bufferSize>&& messages) MIJIN_NOEXCEPT_IF(std::is_nothrow_move_constructible_v<TMessage>)
|
|
: messages_(std::move(messages)) {}
|
|
|
|
MessageQueue& operator=(const MessageQueue&) = delete;
|
|
MessageQueue& operator=(MessageQueue&&) = delete;
|
|
|
|
[[nodiscard]] bool tryPushMaybeMove(TMessage& message);
|
|
[[nodiscard]] bool tryPush(TMessage message) {
|
|
return tryPushMaybeMove(message);
|
|
}
|
|
void push(TMessage message);
|
|
[[nodiscard]] std::optional<TMessage> tryPop();
|
|
[[nodiscard]] TMessage wait();
|
|
|
|
iterator_t begin() MIJIN_NOEXCEPT { return iterator_t(*this); }
|
|
iterator_t end() MIJIN_NOEXCEPT { return iterator_t(); }
|
|
};
|
|
|
|
template<typename TRequest, typename TResponse, std::size_t requestBufferSize = 32, std::size_t responseBufferSize = 32>
|
|
struct TaskMessageQueue
|
|
{
|
|
MessageQueue<TRequest, requestBufferSize> requests;
|
|
MessageQueue<TResponse, responseBufferSize> responses;
|
|
};
|
|
|
|
//
|
|
// public functions
|
|
//
|
|
|
|
template<typename TMessage, std::size_t bufferSize>
|
|
bool MessageQueue<TMessage, bufferSize>::tryPushMaybeMove(TMessage& message)
|
|
{
|
|
unsigned oldWritePos = writePos_.load(std::memory_order_relaxed);
|
|
unsigned newWritePos = 0;
|
|
do
|
|
{
|
|
newWritePos = (oldWritePos + 1) % bufferSize;
|
|
if (newWritePos == readPos_) {
|
|
return false;
|
|
}
|
|
} while (!writePos_.compare_exchange_weak(oldWritePos, newWritePos, std::memory_order_release, std::memory_order_relaxed));
|
|
|
|
while (messageReady_.get(oldWritePos)) {
|
|
std::this_thread::yield(); // someone is still reading, wait...
|
|
}
|
|
|
|
messages_[oldWritePos] = std::move(message);
|
|
messageReady_.set(oldWritePos, true);
|
|
|
|
return true;
|
|
}
|
|
|
|
template<typename TMessage, std::size_t bufferSize>
|
|
void MessageQueue<TMessage, bufferSize>::push(TMessage message)
|
|
{
|
|
while (!tryPushMaybeMove(message)) {
|
|
std::this_thread::yield();
|
|
}
|
|
}
|
|
|
|
template<typename TMessage, std::size_t bufferSize>
|
|
std::optional<TMessage> MessageQueue<TMessage, bufferSize>::tryPop()
|
|
{
|
|
unsigned oldReadPos = readPos_.load(std::memory_order_relaxed);
|
|
unsigned newReadPos = 0;
|
|
do
|
|
{
|
|
if (oldReadPos == writePos_) {
|
|
return std::nullopt;
|
|
}
|
|
newReadPos = (oldReadPos + 1) % bufferSize;
|
|
} while (!readPos_.compare_exchange_weak(oldReadPos, newReadPos, std::memory_order_release, std::memory_order_relaxed));
|
|
|
|
while (!messageReady_.get(oldReadPos)) {
|
|
std::this_thread::yield(); // no harm in busy-waiting here, should be fast
|
|
};
|
|
|
|
TMessage message = std::move(messages_[oldReadPos]);
|
|
messageReady_.set(oldReadPos, false);
|
|
return message;
|
|
}
|
|
|
|
template<typename TMessage, std::size_t bufferSize>
|
|
TMessage MessageQueue<TMessage, bufferSize>::wait()
|
|
{
|
|
while (true)
|
|
{
|
|
std::optional<TMessage> message = tryPop();
|
|
if (message.has_value()) {
|
|
return message.value();
|
|
}
|
|
std::this_thread::yield();
|
|
}
|
|
}
|
|
|
|
} // namespace mijin
|
|
|
|
#endif // !defined(MIJIN_MESSAGE_QUEUE_HPP_INCLUDED)
|