seeking advice and opinions about two different lock-free ring buffer designs.
Notes: The following is being tested on Intel 64-bit i5, on Linux. Only one thread is writing and only one thread is reading during these tests.
Parts of our application are still using an older lock-free ring buffer design. Here is the old design, chopped and edited:
class MidiRecFifo {
MidiRecordEvent fifo[MIDI_REC_FIFO_SIZE];
volatile int size;
int wIndex;
int rIndex;
public:
// Returns true on fifo overflow
bool put(const MidiRecordEvent& event)
{
if (size < MIDI_REC_FIFO_SIZE)
{
fifo[wIndex] = event;
wIndex = (wIndex + 1) % MIDI_REC_FIFO_SIZE;
++size;
return false;
}
return true;
}
MidiRecordEvent get()
{
MidiRecordEvent event(fifo[rIndex]);
rIndex = (rIndex + 1) % MIDI_REC_FIFO_SIZE;
--size;
return event;
}
int getSize() const { return size; }
};
Today I traced a rare elusive show-stopping bug. I caught the read index being permanently one position ahead of the write index, while the size was zero. Once it got into that state, all further operations on the buffer were incorrect as you can imagine.
Since only one thread is writing and only one thread reading these buffers, it seems to me that only the 'size' member could have become corrupt somehow, concurrently, leading to incorrect removals at 'get' time.
Question: How could the 'size' member in the old design have become corrupt if the test machine is Intel 64-bit? Is it not intrinsically atomic? Is the old design really not safe as I suspect?
Several years ago I replaced many but not all of the places using that old design, with my own design. Mine is template-ized and uses atomic variables. Our build configuration brings in the atomics library if necessary. Here is my design, complete in case anything stands out:
template <class T>
class LockFreeMPSCRingBuffer
{
unsigned int _capacity;
T *_fifo;
std::atomic<unsigned int> _size;
std::atomic<unsigned int> _wIndex;
std::atomic<unsigned int> _rIndex;
unsigned int _capacityMask;
// Rounds to the nearest or equal power of 2.
// For 0, 1, and 2, always returns 2.
unsigned int roundCapacity(unsigned int reqCap) const
{
unsigned int i;
for(i = 1; (1U << i) < reqCap; i++);
return 1U << i;
}
public:
// Start simple with just 2, like a flipping buffer for example.
LockFreeMPSCRingBuffer(unsigned int capacity = 2)
{
_capacity = roundCapacity(capacity);
_capacityMask = _capacity - 1;
_fifo = new T[_capacity];
clear();
}
~LockFreeMPSCRingBuffer()
{
if(_fifo)
delete[] _fifo;
}
void setCapacity(unsigned int capacity = 2)
{
if(_fifo)
delete[] _fifo;
_fifo = 0;
_capacity = roundCapacity(capacity);
_capacityMask = _capacity - 1;
_fifo = new T[_capacity];
}
// This is only for the writer.
// Returns true on success, false on fifo overflow or other error.
bool put(const T& item)
{
// Buffer full? Overflow condition.
if(getSize() >= _capacity)
return false;
// Safely read, then increment, the current write position.
//std::atomic<unsigned int> pos = _wIndex++;
unsigned int pos = _wIndex++;
// Mask the position for a circular effect.
pos &= _capacityMask;
// Store the item in that position.
_fifo[pos] = item;
// Now safely increment the size.
_size++;
// Success.
return true;
}
// This is only for the reader.
// Returns true on success, false if nothing to read or other error.
// NOTE: This is not multi-reader safe. Yet.
bool get(T& dst)
{
// Nothing to read?
if(isEmpty())
return false;
// Safely read, then increment, the current read position.
//std::atomic<unsigned int> pos = _rIndex++;
unsigned int pos = _rIndex++;
// Mask the position for a circular effect.
pos &= _capacityMask;
// Store the item in that position into the destination.
dst = _fifo[pos];
// Now safely decrement the size.
_size--;
// Success.
return true;
}
// This is only for the reader.
// NOTE: This is not multi-reader safe. Yet.
const T& peek(unsigned int n = 0)
{
// Safely read the current read position.
//std::atomic<unsigned int> pos = _rIndex.load();
unsigned int pos = _rIndex.load();
// Add the desired position.
pos += n;
// Mask the position for a circular effect.
pos &= _capacityMask;
return _fifo[pos];
}
// This is only for the reader.
// Returns true on success or false if nothing to remove or other error.
bool remove()
{
// Nothing to read?
if(isEmpty())
return false;
// Safely increment the current read position.
_rIndex++;
// Now safely decrement the size.
_size--;
// Success.
return true;
}
// This is only for the reader.
// Returns the number of items in the buffer.
inline unsigned int getSize() const { return _size.load(); }
// This is only for the reader.
inline bool isEmpty() const { return _size.load() == 0; }
// This is not thread safe, call it only when it is safe to do so.
void clear() { _size.store(0); _wIndex.store(0); _rIndex.store(0); }
// This is only for the reader.
// Clear the 'read' side of the ring buffer, which also clears the size.
// NOTE: A corresponding clearWrite() is not provided because it is dangerous to reset
// the size from the sender side - the receiver might cache the size, briefly.
// The sender should only grow the size while the receiver should only shrink it.
void clearRead()
{
//_size = 0; _rIndex = _wIndex;
//_size.store(0); _rIndex.store(_wIndex);
// Safely get a snapshot of the current size.
// Between here and the next lines, the size might increase from put().
// So we increment/decrement relatively, which is equivalent to
// calling remove(), sz times.
const unsigned int sz = getSize();
// Safely increment the current read position.
_rIndex += sz;
// Now safely decrement the size.
_size -= sz;
}
};
In the several years since, I have not seen any bugs or issues with that design in places that use it.
Questions:
Does my design look OK? Can I have confidence in it if I replace the offending old buffer with it, since the bug is really rare to catch?
In my design, if I understood correctly, when the architecture does not support the variables intrinsically, the atomic library will kick in, and may cause short blocking time when the variables are used. Inside a realtime audio callback thread, we are told not to use code that blocks, like malloc etc. How worried should I be about potential blocking time by the atomic library?
Thanks.
Of course from a theoretical view point the first version is definitively broken. volatile
does not imply any memory synchronization between threads. Because there is no synchronization all accesses to data members in the two threads are data races causing undefined behavior.
Practically it may have a similar effect though, because compilers usually won't optimize around it, i.e. they won't reorder stores around it. If you are then on an architecture where release/acquire is implicit (such as x86), meaning that the CPU also won't reorder loads/stores, and if the the load/store instructions that would be compiled for size
are atomic, and none of the variables you need to share between the threads is kept in a register instead of reloaded/stored to memory, then it will still work. But all of that is more or less coincidental.
I think the most likely, and easy to verify, problem here is that it isn't well-defined how incrementing and decrementing a volatile
should behave. Volatiles are not atomic and do not have RMW operations. So really, incrementing/decrementing a volatile is a load followed by a store. Since loads/stores on volatiles are supposed to be observable behavior, a reasonable interpretation is that the compiler ought to keep the load and store in separate machine instructions.
That is what GCC does for example. It loads size
into a register, adds 1
to the register and then stores it back to memory. See https://godbolt.org/z/97qeGPven. It even reorders the load with the store to wIndex
in the previous code line. So the optimization barrier idea also doesn't seem to work out completely.
So, with that the increment/decrement on size
is not atomic at all, neither in theory nor practically, and if the other thread intervenes between the load and store, the counter becomes inconsistent.
Because this volatile behavior can easily be misused as seen here, incrementing and decrementing volatile integers has been deprecated with C++20. It is a nice example that volatile
isn't suitable for thread synchronization.