seeking advice and opinions about two different lock-free ring buffer designs.
Notes: The following is being tested on Intel 64-bit i5, on Linux. Only one thread is writing and only one thread is reading during these tests.
Parts of our application are still using an older lock-free ring buffer design. Here is the old design, chopped and edited:
class MidiRecFifo {
MidiRecordEvent fifo[MIDI_REC_FIFO_SIZE];
volatile int size;
int wIndex;
int rIndex;
public:
// Returns true on fifo overflow
bool put(const MidiRecordEvent& event)
{
if (size < MIDI_REC_FIFO_SIZE)
{
fifo[wIndex] = event;
wIndex = (wIndex + 1) % MIDI_REC_FIFO_SIZE;
++size;
return false;
}
return true;
}
MidiRecordEvent get()
{
MidiRecordEvent event(fifo[rIndex]);
rIndex = (rIndex + 1) % MIDI_REC_FIFO_SIZE;
--size;
return event;
}
int getSize() const { return size; }
};
Today I traced a rare elusive show-stopping bug. I caught the read index being permanently one position ahead of the write index, while the size was zero. Once it got into that state, all further operations on the buffer were incorrect as you can imagine.
Since only one thread is writing and only one thread reading these buffers, it seems to me that only the 'size' member could have become corrupt somehow, concurrently, leading to incorrect removals at 'get' time.
Question: How could the 'size' member in the old design have become corrupt if the test machine is Intel 64-bit? Is it not intrinsically atomic? Is the old design really not safe as I suspect?
Several years ago I replaced many but not all of the places using that old design, with my own design. Mine is template-ized and uses atomic variables. Our build configuration brings in the atomics library if necessary. Here is my design, complete in case anything stands out:
template <class T>
class LockFreeMPSCRingBuffer
{
unsigned int _capacity;
T *_fifo;
std::atomic<unsigned int> _size;
std::atomic<unsigned int> _wIndex;
std::atomic<unsigned int> _rIndex;
unsigned int _capacityMask;
// Rounds to the nearest or equal power of 2.
// For 0, 1, and 2, always returns 2.
unsigned int roundCapacity(unsigned int reqCap) const
{
unsigned int i;
for(i = 1; (1U << i) < reqCap; i++);
return 1U << i;
}
public:
// Start simple with just 2, like a flipping buffer for example.
LockFreeMPSCRingBuffer(unsigned int capacity = 2)
{
_capacity = roundCapacity(capacity);
_capacityMask = _capacity - 1;
_fifo = new T[_capacity];
clear();
}
~LockFreeMPSCRingBuffer()
{
if(_fifo)
delete[] _fifo;
}
void setCapacity(unsigned int capacity = 2)
{
if(_fifo)
delete[] _fifo;
_fifo = 0;
_capacity = roundCapacity(capacity);
_capacityMask = _capacity - 1;
_fifo = new T[_capacity];
}
// This is only for the writer.
// Returns true on success, false on fifo overflow or other error.
bool put(const T& item)
{
// Buffer full? Overflow condition.
if(getSize() >= _capacity)
return false;
// Safely read, then increment, the current write position.
//std::atomic<unsigned int> pos = _wIndex++;
unsigned int pos = _wIndex++;
// Mask the position for a circular effect.
pos &= _capacityMask;
// Store the item in that position.
_fifo[pos] = item;
// Now safely increment the size.
_size++;
// Success.
return true;
}
// This is only for the reader.
// Returns true on success, false if nothing to read or other error.
// NOTE: This is not multi-reader safe. Yet.
bool get(T& dst)
{
// Nothing to read?
if(isEmpty())
return false;
// Safely read, then increment, the current read position.
//std::atomic<unsigned int> pos = _rIndex++;
unsigned int pos = _rIndex++;
// Mask the position for a circular effect.
pos &= _capacityMask;
// Store the item in that position into the destination.
dst = _fifo[pos];
// Now safely decrement the size.
_size--;
// Success.
return true;
}
// This is only for the reader.
// NOTE: This is not multi-reader safe. Yet.
const T& peek(unsigned int n = 0)
{
// Safely read the current read position.
//std::atomic<unsigned int> pos = _rIndex.load();
unsigned int pos = _rIndex.load();
// Add the desired position.
pos += n;
// Mask the position for a circular effect.
pos &= _capacityMask;
return _fifo[pos];
}
// This is only for the reader.
// Returns true on success or false if nothing to remove or other error.
bool remove()
{
// Nothing to read?
if(isEmpty())
return false;
// Safely increment the current read position.
_rIndex++;
// Now safely decrement the size.
_size--;
// Success.
return true;
}
// This is only for the reader.
// Returns the number of items in the buffer.
inline unsigned int getSize() const { return _size.load(); }
// This is only for the reader.
inline bool isEmpty() const { return _size.load() == 0; }
// This is not thread safe, call it only when it is safe to do so.
void clear() { _size.store(0); _wIndex.store(0); _rIndex.store(0); }
// This is only for the reader.
// Clear the 'read' side of the ring buffer, which also clears the size.
// NOTE: A corresponding clearWrite() is not provided because it is dangerous to reset
// the size from the sender side - the receiver might cache the size, briefly.
// The sender should only grow the size while the receiver should only shrink it.
void clearRead()
{
//_size = 0; _rIndex = _wIndex;
//_size.store(0); _rIndex.store(_wIndex);
// Safely get a snapshot of the current size.
// Between here and the next lines, the size might increase from put().
// So we increment/decrement relatively, which is equivalent to
// calling remove(), sz times.
const unsigned int sz = getSize();
// Safely increment the current read position.
_rIndex += sz;
// Now safely decrement the size.
_size -= sz;
}
};
In the several years since, I have not seen any bugs or issues with that design in places that use it.
Questions:
Does my design look OK? Can I have confidence in it if I replace the offending old buffer with it, since the bug is really rare to catch?
In my design, if I understood correctly, when the architecture does not support the variables intrinsically, the atomic library will kick in, and may cause short blocking time when the variables are used. Inside a realtime audio callback thread, we are told not to use code that blocks, like malloc etc. How worried should I be about potential blocking time by the atomic library?
Thanks.