I was trying to build a MPSC lock-free ring buffer for learning purpose, and am running into race conditions.
A description of the MPSC ring buffer:
- It is guaranteed that poll() is never called when the buffer is empty.
- Instead of mod'ing head and tail like a traditional ring buffer, it lets them proceed linearly, and AND's them before using them (since the buffer size is a power of 2, this works ok with overflow).
- We keep MAX_PRODUCERS-1 slots open in the queue so that if multiple producers come and see one slot is available and proceed, they can all place their entries.
- It uses 32-bit quantities for head and tail, so that it can snapshot them with a 64-bit atomic read without a lock.
My test involves a couple of threads writing some known set of values to the queue, and a consumer thread polling (when the buffer is not empty) and summing all, and verifying the correct result is obtained. With 2 or more producers, I get inconsistent sums (and with 1 producer, it works).
Any help would be much appreciated. Thank you!
Here is the code:
struct ring_buf_entry {
uint32_t seqn;
};
struct __attribute__((packed, aligned(8))) ring_buf {
union {
struct {
volatile uint32_t tail;
volatile uint32_t head;
};
volatile uint64_t snapshot;
};
volatile struct ring_buf_entry buf[RING_BUF_SIZE];
};
#define RING_SUB(x,y) ((x)>=(y)?((x)-(y)):((x)+(1ULL<<32)-(y)))
static void ring_buf_push(struct ring_buf* rb, uint32_t seqn)
{
size_t pos;
while (1) {
// rely on aligned, packed, and no member-reordering properties
uint64_t snapshot = __atomic_load_n(&(rb->snapshot), __ATOMIC_SEQ_CST);
// little endian.
uint64_t snap_head = snapshot >> 32;
uint64_t snap_tail = snapshot & 0xffffffffULL;
if (RING_SUB(snap_tail, snap_head) < RING_BUF_SIZE - MAX_PRODUCERS + 1) {
uint32_t exp = snap_tail;
if (__atomic_compare_exchange_n(&(rb->tail), &exp, snap_tail+1, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
pos = snap_tail;
break;
}
}
asm volatile("pause\n": : :"memory");
}
pos &= RING_BUF_SIZE-1;
rb->buf[pos].seqn = seqn;
asm volatile("sfence\n": : :"memory");
}
static struct ring_buf_entry ring_buf_poll(struct ring_buf* rb)
{
struct ring_buf_entry ret = rb->buf[__atomic_load_n(&(rb->head), __ATOMIC_SEQ_CST) & (RING_BUF_SIZE-1)];
__atomic_add_fetch(&(rb->head), 1, __ATOMIC_SEQ_CST);
return ret;
}