187 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			187 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| 
 | |
| #pragma once
 | |
| 
 | |
| #if !defined(MIJIN_MESSAGE_QUEUE_HPP_INCLUDED)
 | |
| #define MIJIN_MESSAGE_QUEUE_HPP_INCLUDED 1
 | |
| 
 | |
| #include <array>
 | |
| #include <atomic>
 | |
| #include <optional>
 | |
| #include <thread>
 | |
| #include "../internal/common.hpp"
 | |
| #include "../util/bitarray.hpp"
 | |
| 
 | |
| namespace mijin
 | |
| {
 | |
| 
 | |
| //
 | |
| // public defines
 | |
| //
 | |
| 
 | |
| //
 | |
| // public constants
 | |
| //
 | |
| 
 | |
| //
 | |
| // public types
 | |
| //
 | |
| 
 | |
| template<typename TMessageQueue>
 | |
| class MessageQueueIterator
 | |
| {
 | |
| public:
 | |
|     using value_type = typename TMessageQueue::message_t;
 | |
|     using reference = value_type&;
 | |
|     using pointer = value_type*;
 | |
| private:
 | |
|     TMessageQueue* queue_ = nullptr;
 | |
|     std::optional<value_type> message_;
 | |
| public:
 | |
|     MessageQueueIterator() = default;
 | |
|     explicit MessageQueueIterator(TMessageQueue& queue) MIJIN_NOEXCEPT : queue_(&queue) {
 | |
|         message_ = queue_->tryPop();
 | |
|     }
 | |
|     MessageQueueIterator(const MessageQueueIterator&) = delete;
 | |
|     MessageQueueIterator(MessageQueueIterator&&) = default;
 | |
| 
 | |
|     MessageQueueIterator& operator=(const MessageQueueIterator&) = delete;
 | |
|     MessageQueueIterator& operator=(MessageQueueIterator&&) = default;
 | |
| 
 | |
|     bool operator==(const MessageQueueIterator& other) MIJIN_NOEXCEPT
 | |
|     {
 | |
|         return message_.has_value() == other.message_.has_value();
 | |
|     }
 | |
|     bool operator!=(const MessageQueueIterator& other) MIJIN_NOEXCEPT
 | |
|     {
 | |
|         return !(*this == other);
 | |
|     }
 | |
| 
 | |
|     reference operator*() MIJIN_NOEXCEPT
 | |
|     {
 | |
|         return message_.value();
 | |
|     }
 | |
| 
 | |
|     pointer operator->() MIJIN_NOEXCEPT
 | |
|     {
 | |
|         return &message_.value();
 | |
|     }
 | |
| 
 | |
|     MessageQueueIterator& operator++() MIJIN_NOEXCEPT
 | |
|     {
 | |
|         message_ = queue_->tryPop();
 | |
|         return *this;
 | |
|     }
 | |
| };
 | |
| 
 | |
| template<typename TMessage, std::size_t bufferSize = 32>
 | |
| class MessageQueue
 | |
| {
 | |
| public:
 | |
|     using message_t = TMessage;
 | |
|     using iterator_t = MessageQueueIterator<MessageQueue<TMessage, bufferSize>>;
 | |
| private:
 | |
|     std::array<TMessage, bufferSize> messages;
 | |
|     mijin::BitArray<bufferSize, true> messageReady;
 | |
|     std::atomic_uint writePos = 0;
 | |
|     std::atomic_uint readPos = 0;
 | |
| public:
 | |
|     MessageQueue() = default;
 | |
|     MessageQueue(const MessageQueue&) = delete;
 | |
|     MessageQueue(MessageQueue&&) = delete;
 | |
| 
 | |
|     MessageQueue& operator=(const MessageQueue&) = delete;
 | |
|     MessageQueue& operator=(MessageQueue&&) = delete;
 | |
| 
 | |
|     [[nodiscard]] bool tryPushMaybeMove(TMessage& message);
 | |
|     [[nodiscard]] bool tryPush(TMessage message) {
 | |
|         return tryPushMaybeMove(message);
 | |
|     }
 | |
|     void push(TMessage message);
 | |
|     [[nodiscard]] std::optional<TMessage> tryPop();
 | |
|     [[nodiscard]] TMessage wait();
 | |
| 
 | |
|     iterator_t begin() MIJIN_NOEXCEPT { return iterator_t(*this); }
 | |
|     iterator_t end() MIJIN_NOEXCEPT { return iterator_t(); }
 | |
| };
 | |
| 
 | |
| template<typename TRequest, typename TResponse, std::size_t requestBufferSize = 32, std::size_t responseBufferSize = 32>
 | |
| struct TaskMessageQueue
 | |
| {
 | |
|     MessageQueue<TRequest, requestBufferSize>   requests;
 | |
|     MessageQueue<TResponse, responseBufferSize> responses;
 | |
| };
 | |
| 
 | |
| //
 | |
| // public functions
 | |
| //
 | |
| 
 | |
| template<typename TMessage, std::size_t bufferSize>
 | |
| bool MessageQueue<TMessage, bufferSize>::tryPushMaybeMove(TMessage& message)
 | |
| {
 | |
|     unsigned oldWritePos = writePos.load(std::memory_order_relaxed);
 | |
|     unsigned newWritePos = 0;
 | |
|     do
 | |
|     {
 | |
|         newWritePos = (oldWritePos + 1) % bufferSize;
 | |
|         if (newWritePos == readPos) {
 | |
|             return false;
 | |
|         }
 | |
|     } while (!writePos.compare_exchange_weak(oldWritePos, newWritePos, std::memory_order_release, std::memory_order_relaxed));
 | |
| 
 | |
|     while (messageReady.get(oldWritePos)) {
 | |
|         std::this_thread::yield(); // someone is still reading, wait...
 | |
|     }
 | |
| 
 | |
|     messages[oldWritePos] = std::move(message);
 | |
|     messageReady.set(oldWritePos, true);
 | |
|     
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| template<typename TMessage, std::size_t bufferSize>
 | |
| void MessageQueue<TMessage, bufferSize>::push(TMessage message)
 | |
| {
 | |
|     while (!tryPushMaybeMove(message)) {
 | |
|         std::this_thread::yield();
 | |
|     }
 | |
| }
 | |
| 
 | |
| template<typename TMessage, std::size_t bufferSize>
 | |
| std::optional<TMessage> MessageQueue<TMessage, bufferSize>::tryPop()
 | |
| {
 | |
|     unsigned oldReadPos = readPos.load(std::memory_order_relaxed);
 | |
|     unsigned newReadPos = 0;
 | |
|     do
 | |
|     {
 | |
|         if (oldReadPos == writePos) {
 | |
|             return std::nullopt;
 | |
|         }
 | |
|         newReadPos = (oldReadPos + 1) % bufferSize;
 | |
|     } while (!readPos.compare_exchange_weak(oldReadPos, newReadPos, std::memory_order_release, std::memory_order_relaxed));
 | |
| 
 | |
|     while (!messageReady.get(oldReadPos)) {
 | |
|         std::this_thread::yield(); // no harm in busy-waiting here, should be fast
 | |
|     };
 | |
| 
 | |
|     TMessage message = std::move(messages[oldReadPos]);
 | |
|     messageReady.set(oldReadPos, false);
 | |
|     return message;
 | |
| }
 | |
| 
 | |
| template<typename TMessage, std::size_t bufferSize>
 | |
| TMessage MessageQueue<TMessage, bufferSize>::wait()
 | |
| {
 | |
|     while (true)
 | |
|     {
 | |
|         std::optional<TMessage> message = tryPop();
 | |
|         if (message.has_value()) {
 | |
|             return message.value();
 | |
|         }
 | |
|         std::this_thread::yield();
 | |
|     }
 | |
| }
 | |
| 
 | |
| } // namespace mijin
 | |
| 
 | |
| #endif // !defined(MIJIN_MESSAGE_QUEUE_HPP_INCLUDED)
 |