I have a single-consumer, multiple-producer lock-free queue (MPSCQueue) combined with a std::counting_semaphore to notify the consumer when new items are enqueued. The consumer uses dequeue() to attempt fetching an item; if it succeeds, it calls sema.try_acquire() which I expect to succeed every time there’s actually an item in the queue. However, under very rare circumstances, I get an assertion failure indicating try_acquire() returned false even though the queue returned a valid item.
I can confirm there is truly only one consumer thread. There are no other places calling dequeue(). The code uses std::memory_order_seq_cst for atomic operations in the queue, and sema.release()/sema.acquire() for the semaphore.
It seems like a memory ordering or visibility issue where the consumer sees the newly linked node in the queue before it sees the corresponding release() operation on the semaphore. But I thought seq_cst plus semaphore release/acquire would be enough to guarantee proper ordering. Any ideas or clarifications on how to ensure we never see a valid item in the queue but still fail try_acquire()?
#include <atomic>
#include <cassert>
#include <iostream>
#include <optional>
#include <semaphore>
template <typename T>
class MPSCQueue {
struct Node {
T data;
std::atomic<Node*> next;
// Default constructor for the dummy node
Node() : next(nullptr) {}
// Constructor that moves the data in
Node(T data_) : data(std::move(data_)), next(nullptr) {}
};
// Atomic head pointer for multiple producers
std::atomic<Node*> head;
// Tail pointer for the single consumer
Node* tail;
public:
std::atomic_size_t enqueue_count = 0;
size_t dequeue_count = 0;
MPSCQueue() {
Node* dummy = new Node();
head.store(dummy, std::memory_order_seq_cst);
tail = dummy;
}
~MPSCQueue() {
Node* node = tail;
while (node) {
Node* next = node->next.load(std::memory_order_seq_cst);
delete node;
node = next;
}
}
// Called by producers
void enqueue(T data) {
enqueue_count.fetch_add(1);
Node* node = new Node(std::move(data));
// Swap in the new node as the head
Node* prev_head = head.exchange(node, std::memory_order_seq_cst);
// Link the old head to the new node
prev_head->next.store(node, std::memory_order_seq_cst);
}
// Called by the single consumer
std::optional<T> dequeue() {
// Check the next pointer of the tail
Node* next = tail->next.load(std::memory_order_seq_cst);
if (next) {
// Move the data out
T res = std::move(next->data);
delete tail;
tail = next;
dequeue_count += 1;
return res;
}
return std::nullopt;
}
size_t size() { return enqueue_count.load() - dequeue_count; }
};
template <typename T>
class MPSCQueueConsumerLock {
MPSCQueue<T> queue;
std::counting_semaphore<> sema{0};
public:
void enqueue(T data) {
queue.enqueue(std::move(data));
// Release the semaphore to notify the consumer
sema.release();
}
// Single consumer calls this
T dequeue() {
auto re = queue.dequeue();
if (re.has_value()) {
// We have an item, so we expect the semaphore count to be > 0
if (!sema.try_acquire()) {
// Unexpectedly fails in rare cases
std::cerr << __FILE__ << ":" << __FUNCTION__
<< " sema.try_acquire() should succeed, please check\n";
assert(false);
}
return re.value();
}
// Otherwise, block until something is available
sema.acquire();
return queue.dequeue().value();
}
size_t size() { return queue.size(); }
};
Sometimes (very low probability, but still possible) the code hits:
/path/to/mpsc.hpp:dequeue sema try_acquire should be success, please check
python: /path/to/mpsc.hpp:79: T MPSCQueueConsumerLock<T>::dequeue() [...]
Assertion `false' failed.
Aborted (core dumped)
I have verified there is truly only one consumer thread and this is the only place where dequeue() is called.
How could it happen that the consumer can see the new node in MPSCQueue, but the corresponding sema.release() call is not observed (so try_acquire() fails)? Is there a subtlety in std::counting_semaphore or the memory ordering that I’m missing? Any suggestions to ensure the consumer’s view of the queue and the semaphore’s count stay consistent?
Any pointers or advice would be greatly appreciated. Thank you!
You have a race condition in your check.
void enqueue(T data) {
queue.enqueue(std::move(data));
// thread suspended by OS here
sema.release();
}
T dequeue() {
auto re = queue.dequeue();
if (re.has_value()) {
// thread gets an item while the other thread is suspended
// semaphore not incremented yet !
if (!sema.try_acquire()) {
...
}
If you have 2 atomic operations A and B, and A happens before B, if B is visible then A is visible, however seeing A visible doesn't infer anything on B, you need to decrement the semaphore before you dequeue.
T dequeue() {
sema.acquire();
return queue.dequeue().value();
}