Lots of windows fixes and some more improvements.
This commit is contained in:
@@ -79,15 +79,20 @@ class MessageQueue
|
||||
public:
|
||||
using message_t = TMessage;
|
||||
using iterator_t = MessageQueueIterator<MessageQueue<TMessage, bufferSize>>;
|
||||
static constexpr std::size_t BUFFER_SIZE = bufferSize;
|
||||
private:
|
||||
std::array<TMessage, bufferSize> messages;
|
||||
mijin::BitArray<bufferSize, true> messageReady;
|
||||
std::atomic_uint writePos = 0;
|
||||
std::atomic_uint readPos = 0;
|
||||
std::array<TMessage, bufferSize> messages_;
|
||||
mijin::BitArray<bufferSize, true> messageReady_;
|
||||
std::atomic_uint writePos_ = 0;
|
||||
std::atomic_uint readPos_ = 0;
|
||||
public:
|
||||
MessageQueue() = default;
|
||||
MessageQueue(const MessageQueue&) = delete;
|
||||
MessageQueue(MessageQueue&&) = delete;
|
||||
explicit MessageQueue(const std::array<TMessage, bufferSize>& messages) MIJIN_NOEXCEPT_IF(std::is_nothrow_copy_constructible_v<TMessage>)
|
||||
: messages_(messages) {}
|
||||
explicit MessageQueue(std::array<TMessage, bufferSize>&& messages) MIJIN_NOEXCEPT_IF(std::is_nothrow_move_constructible_v<TMessage>)
|
||||
: messages_(std::move(messages)) {}
|
||||
|
||||
MessageQueue& operator=(const MessageQueue&) = delete;
|
||||
MessageQueue& operator=(MessageQueue&&) = delete;
|
||||
@@ -118,22 +123,22 @@ struct TaskMessageQueue
|
||||
template<typename TMessage, std::size_t bufferSize>
|
||||
bool MessageQueue<TMessage, bufferSize>::tryPushMaybeMove(TMessage& message)
|
||||
{
|
||||
unsigned oldWritePos = writePos.load(std::memory_order_relaxed);
|
||||
unsigned oldWritePos = writePos_.load(std::memory_order_relaxed);
|
||||
unsigned newWritePos = 0;
|
||||
do
|
||||
{
|
||||
newWritePos = (oldWritePos + 1) % bufferSize;
|
||||
if (newWritePos == readPos) {
|
||||
if (newWritePos == readPos_) {
|
||||
return false;
|
||||
}
|
||||
} while (!writePos.compare_exchange_weak(oldWritePos, newWritePos, std::memory_order_release, std::memory_order_relaxed));
|
||||
} while (!writePos_.compare_exchange_weak(oldWritePos, newWritePos, std::memory_order_release, std::memory_order_relaxed));
|
||||
|
||||
while (messageReady.get(oldWritePos)) {
|
||||
while (messageReady_.get(oldWritePos)) {
|
||||
std::this_thread::yield(); // someone is still reading, wait...
|
||||
}
|
||||
|
||||
messages[oldWritePos] = std::move(message);
|
||||
messageReady.set(oldWritePos, true);
|
||||
messages_[oldWritePos] = std::move(message);
|
||||
messageReady_.set(oldWritePos, true);
|
||||
|
||||
return true;
|
||||
}
|
||||
@@ -149,22 +154,22 @@ void MessageQueue<TMessage, bufferSize>::push(TMessage message)
|
||||
template<typename TMessage, std::size_t bufferSize>
|
||||
std::optional<TMessage> MessageQueue<TMessage, bufferSize>::tryPop()
|
||||
{
|
||||
unsigned oldReadPos = readPos.load(std::memory_order_relaxed);
|
||||
unsigned oldReadPos = readPos_.load(std::memory_order_relaxed);
|
||||
unsigned newReadPos = 0;
|
||||
do
|
||||
{
|
||||
if (oldReadPos == writePos) {
|
||||
if (oldReadPos == writePos_) {
|
||||
return std::nullopt;
|
||||
}
|
||||
newReadPos = (oldReadPos + 1) % bufferSize;
|
||||
} while (!readPos.compare_exchange_weak(oldReadPos, newReadPos, std::memory_order_release, std::memory_order_relaxed));
|
||||
} while (!readPos_.compare_exchange_weak(oldReadPos, newReadPos, std::memory_order_release, std::memory_order_relaxed));
|
||||
|
||||
while (!messageReady.get(oldReadPos)) {
|
||||
while (!messageReady_.get(oldReadPos)) {
|
||||
std::this_thread::yield(); // no harm in busy-waiting here, should be fast
|
||||
};
|
||||
|
||||
TMessage message = std::move(messages[oldReadPos]);
|
||||
messageReady.set(oldReadPos, false);
|
||||
TMessage message = std::move(messages_[oldReadPos]);
|
||||
messageReady_.set(oldReadPos, false);
|
||||
return message;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user