#pragma once #if !defined(MIJIN_MESSAGE_QUEUE_HPP_INCLUDED) #define MIJIN_MESSAGE_QUEUE_HPP_INCLUDED 1 #include #include #include #include #include "../internal/common.hpp" #include "../util/bitarray.hpp" namespace mijin { // // public defines // // // public constants // // // public types // template class MessageQueueIterator { public: using value_type = typename TMessageQueue::message_t; using reference = value_type&; using pointer = value_type*; private: TMessageQueue* queue_ = nullptr; std::optional message_; public: MessageQueueIterator() = default; explicit MessageQueueIterator(TMessageQueue& queue) MIJIN_NOEXCEPT : queue_(&queue) { message_ = queue_->tryPop(); } MessageQueueIterator(const MessageQueueIterator&) = delete; MessageQueueIterator(MessageQueueIterator&&) = default; MessageQueueIterator& operator=(const MessageQueueIterator&) = delete; MessageQueueIterator& operator=(MessageQueueIterator&&) = default; bool operator==(const MessageQueueIterator& other) MIJIN_NOEXCEPT { return message_.has_value() == other.message_.has_value(); } bool operator!=(const MessageQueueIterator& other) MIJIN_NOEXCEPT { return !(*this == other); } reference operator*() MIJIN_NOEXCEPT { return message_.value(); } pointer operator->() MIJIN_NOEXCEPT { return &message_.value(); } MessageQueueIterator& operator++() MIJIN_NOEXCEPT { message_ = queue_->tryPop(); return *this; } }; template class MessageQueue { public: using message_t = TMessage; using iterator_t = MessageQueueIterator>; private: std::array messages; mijin::BitArray messageReady; std::atomic_uint writePos = 0; std::atomic_uint readPos = 0; public: MessageQueue() = default; MessageQueue(const MessageQueue&) = delete; MessageQueue(MessageQueue&&) = delete; MessageQueue& operator=(const MessageQueue&) = delete; MessageQueue& operator=(MessageQueue&&) = delete; [[nodiscard]] bool tryPushMaybeMove(TMessage& message); [[nodiscard]] bool tryPush(TMessage message) { return tryPushMaybeMove(message); } void push(TMessage message); [[nodiscard]] std::optional tryPop(); [[nodiscard]] TMessage wait(); iterator_t begin() MIJIN_NOEXCEPT { return iterator_t(*this); } iterator_t end() MIJIN_NOEXCEPT { return iterator_t(); } }; template struct TaskMessageQueue { MessageQueue requests; MessageQueue responses; }; // // public functions // template bool MessageQueue::tryPushMaybeMove(TMessage& message) { unsigned oldWritePos = writePos.load(std::memory_order_relaxed); unsigned newWritePos = 0; do { newWritePos = (oldWritePos + 1) % bufferSize; if (newWritePos == readPos) { return false; } } while (!writePos.compare_exchange_weak(oldWritePos, newWritePos, std::memory_order_release, std::memory_order_relaxed)); while (messageReady.get(oldWritePos)) { std::this_thread::yield(); // someone is still reading, wait... } messages[oldWritePos] = std::move(message); messageReady.set(oldWritePos, true); return true; } template void MessageQueue::push(TMessage message) { while (!tryPushMaybeMove(message)) { std::this_thread::yield(); } } template std::optional MessageQueue::tryPop() { unsigned oldReadPos = readPos.load(std::memory_order_relaxed); unsigned newReadPos = 0; do { if (oldReadPos == writePos) { return std::nullopt; } newReadPos = (oldReadPos + 1) % bufferSize; } while (!readPos.compare_exchange_weak(oldReadPos, newReadPos, std::memory_order_release, std::memory_order_relaxed)); while (!messageReady.get(oldReadPos)) { std::this_thread::yield(); // no harm in busy-waiting here, should be fast }; TMessage message = std::move(messages[oldReadPos]); messageReady.set(oldReadPos, false); return message; } template TMessage MessageQueue::wait() { while (true) { std::optional message = tryPop(); if (message.has_value()) { return message.value(); } std::this_thread::yield(); } } } // namespace mijin #endif // !defined(MIJIN_MESSAGE_QUEUE_HPP_INCLUDED)