187 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			187 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
 | 
						|
#pragma once
 | 
						|
 | 
						|
#if !defined(MIJIN_MESSAGE_QUEUE_HPP_INCLUDED)
 | 
						|
#define MIJIN_MESSAGE_QUEUE_HPP_INCLUDED 1
 | 
						|
 | 
						|
#include <array>
 | 
						|
#include <atomic>
 | 
						|
#include <optional>
 | 
						|
#include <thread>
 | 
						|
#include "../internal/common.hpp"
 | 
						|
#include "../util/bitarray.hpp"
 | 
						|
 | 
						|
namespace mijin
 | 
						|
{
 | 
						|
 | 
						|
//
 | 
						|
// public defines
 | 
						|
//
 | 
						|
 | 
						|
//
 | 
						|
// public constants
 | 
						|
//
 | 
						|
 | 
						|
//
 | 
						|
// public types
 | 
						|
//
 | 
						|
 | 
						|
template<typename TMessageQueue>
 | 
						|
class MessageQueueIterator
 | 
						|
{
 | 
						|
public:
 | 
						|
    using value_type = typename TMessageQueue::message_t;
 | 
						|
    using reference = value_type&;
 | 
						|
    using pointer = value_type*;
 | 
						|
private:
 | 
						|
    TMessageQueue* queue_ = nullptr;
 | 
						|
    std::optional<value_type> message_;
 | 
						|
public:
 | 
						|
    MessageQueueIterator() = default;
 | 
						|
    explicit MessageQueueIterator(TMessageQueue& queue) MIJIN_NOEXCEPT : queue_(&queue) {
 | 
						|
        message_ = queue_->tryPop();
 | 
						|
    }
 | 
						|
    MessageQueueIterator(const MessageQueueIterator&) = delete;
 | 
						|
    MessageQueueIterator(MessageQueueIterator&&) = default;
 | 
						|
 | 
						|
    MessageQueueIterator& operator=(const MessageQueueIterator&) = delete;
 | 
						|
    MessageQueueIterator& operator=(MessageQueueIterator&&) = default;
 | 
						|
 | 
						|
    bool operator==(const MessageQueueIterator& other) MIJIN_NOEXCEPT
 | 
						|
    {
 | 
						|
        return message_.has_value() == other.message_.has_value();
 | 
						|
    }
 | 
						|
    bool operator!=(const MessageQueueIterator& other) MIJIN_NOEXCEPT
 | 
						|
    {
 | 
						|
        return !(*this == other);
 | 
						|
    }
 | 
						|
 | 
						|
    reference operator*() MIJIN_NOEXCEPT
 | 
						|
    {
 | 
						|
        return message_.value();
 | 
						|
    }
 | 
						|
 | 
						|
    pointer operator->() MIJIN_NOEXCEPT
 | 
						|
    {
 | 
						|
        return &message_.value();
 | 
						|
    }
 | 
						|
 | 
						|
    MessageQueueIterator& operator++() MIJIN_NOEXCEPT
 | 
						|
    {
 | 
						|
        message_ = queue_->tryPop();
 | 
						|
        return *this;
 | 
						|
    }
 | 
						|
};
 | 
						|
 | 
						|
template<typename TMessage, std::size_t bufferSize = 32>
 | 
						|
class MessageQueue
 | 
						|
{
 | 
						|
public:
 | 
						|
    using message_t = TMessage;
 | 
						|
    using iterator_t = MessageQueueIterator<MessageQueue<TMessage, bufferSize>>;
 | 
						|
private:
 | 
						|
    std::array<TMessage, bufferSize> messages;
 | 
						|
    mijin::BitArray<bufferSize, true> messageReady;
 | 
						|
    std::atomic_uint writePos = 0;
 | 
						|
    std::atomic_uint readPos = 0;
 | 
						|
public:
 | 
						|
    MessageQueue() = default;
 | 
						|
    MessageQueue(const MessageQueue&) = delete;
 | 
						|
    MessageQueue(MessageQueue&&) = delete;
 | 
						|
 | 
						|
    MessageQueue& operator=(const MessageQueue&) = delete;
 | 
						|
    MessageQueue& operator=(MessageQueue&&) = delete;
 | 
						|
 | 
						|
    [[nodiscard]] bool tryPushMaybeMove(TMessage& message);
 | 
						|
    [[nodiscard]] bool tryPush(TMessage message) {
 | 
						|
        return tryPushMaybeMove(message);
 | 
						|
    }
 | 
						|
    void push(TMessage message);
 | 
						|
    [[nodiscard]] std::optional<TMessage> tryPop();
 | 
						|
    [[nodiscard]] TMessage wait();
 | 
						|
 | 
						|
    iterator_t begin() MIJIN_NOEXCEPT { return iterator_t(*this); }
 | 
						|
    iterator_t end() MIJIN_NOEXCEPT { return iterator_t(); }
 | 
						|
};
 | 
						|
 | 
						|
template<typename TRequest, typename TResponse, std::size_t requestBufferSize = 32, std::size_t responseBufferSize = 32>
 | 
						|
struct TaskMessageQueue
 | 
						|
{
 | 
						|
    MessageQueue<TRequest, requestBufferSize>   requests;
 | 
						|
    MessageQueue<TResponse, responseBufferSize> responses;
 | 
						|
};
 | 
						|
 | 
						|
//
 | 
						|
// public functions
 | 
						|
//
 | 
						|
 | 
						|
template<typename TMessage, std::size_t bufferSize>
 | 
						|
bool MessageQueue<TMessage, bufferSize>::tryPushMaybeMove(TMessage& message)
 | 
						|
{
 | 
						|
    unsigned oldWritePos = writePos.load(std::memory_order_relaxed);
 | 
						|
    unsigned newWritePos = 0;
 | 
						|
    do
 | 
						|
    {
 | 
						|
        newWritePos = (oldWritePos + 1) % bufferSize;
 | 
						|
        if (newWritePos == readPos) {
 | 
						|
            return false;
 | 
						|
        }
 | 
						|
    } while (!writePos.compare_exchange_weak(oldWritePos, newWritePos, std::memory_order_release, std::memory_order_relaxed));
 | 
						|
 | 
						|
    while (messageReady.get(oldWritePos)) {
 | 
						|
        std::this_thread::yield(); // someone is still reading, wait...
 | 
						|
    }
 | 
						|
 | 
						|
    messages[oldWritePos] = std::move(message);
 | 
						|
    messageReady.set(oldWritePos, true);
 | 
						|
    
 | 
						|
    return true;
 | 
						|
}
 | 
						|
 | 
						|
template<typename TMessage, std::size_t bufferSize>
 | 
						|
void MessageQueue<TMessage, bufferSize>::push(TMessage message)
 | 
						|
{
 | 
						|
    while (!tryPushMaybeMove(message)) {
 | 
						|
        std::this_thread::yield();
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
template<typename TMessage, std::size_t bufferSize>
 | 
						|
std::optional<TMessage> MessageQueue<TMessage, bufferSize>::tryPop()
 | 
						|
{
 | 
						|
    unsigned oldReadPos = readPos.load(std::memory_order_relaxed);
 | 
						|
    unsigned newReadPos = 0;
 | 
						|
    do
 | 
						|
    {
 | 
						|
        if (oldReadPos == writePos) {
 | 
						|
            return std::nullopt;
 | 
						|
        }
 | 
						|
        newReadPos = (oldReadPos + 1) % bufferSize;
 | 
						|
    } while (!readPos.compare_exchange_weak(oldReadPos, newReadPos, std::memory_order_release, std::memory_order_relaxed));
 | 
						|
 | 
						|
    while (!messageReady.get(oldReadPos)) {
 | 
						|
        std::this_thread::yield(); // no harm in busy-waiting here, should be fast
 | 
						|
    };
 | 
						|
 | 
						|
    TMessage message = std::move(messages[oldReadPos]);
 | 
						|
    messageReady.set(oldReadPos, false);
 | 
						|
    return message;
 | 
						|
}
 | 
						|
 | 
						|
template<typename TMessage, std::size_t bufferSize>
 | 
						|
TMessage MessageQueue<TMessage, bufferSize>::wait()
 | 
						|
{
 | 
						|
    while (true)
 | 
						|
    {
 | 
						|
        std::optional<TMessage> message = tryPop();
 | 
						|
        if (message.has_value()) {
 | 
						|
            return message.value();
 | 
						|
        }
 | 
						|
        std::this_thread::yield();
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
} // namespace mijin
 | 
						|
 | 
						|
#endif // !defined(MIJIN_MESSAGE_QUEUE_HPP_INCLUDED)
 |