I have struggled with what must be a fundamental misunderstanding of how atomics work in C++. I have written the code below to implement a fast ring buffer using atomic variables for indexes so multiple threads can write to and read from the buffer. I've whittled the code down to this simple case (which I realize is still a little long. Sorry.). If I run this on either Linux or Mac OS X, it will work some of the time, but it will also throw exceptions at least 10% of the time. It also seems to run very fast, and then slow down, and maybe even speed up again also suggesting something is not quite right. I cannot understand the flaw in my logic. Do I need a fence somewhere?
Here's a simple description of what it's trying to do: Atomic index variables are bumped up using the compare_exchange_weak method. This is to guarantee exclusive access to the slot the index was bumped from. Two indices are actually needed so as we wrap around the ring buffer, values are not overwritten. More details are embedded in the comments.
#include <mutex>
#include <atomic>
#include <iostream>
#include <cstdint>
#include <vector>
#include <thread>
using namespace std;
const uint64_t Nevents = 1000000;
std::atomic<uint64_t> Nwritten(0);
std::atomic<uint64_t> Nread(0);
#define MAX_EVENTS 10
mutex MTX;
std::atomic<uint32_t> iread{0}; // The slot that the next thread will try to read from
std::atomic<uint32_t> iwrite{0}; // The slot that the next thread will try to write to
std::atomic<uint32_t> ibegin{0}; // The slot indicating the beginning of the read region
std::atomic<uint32_t> iend{0}; // The slot indicating one-past-the-end of the read region
std::atomic<uint64_t> EVENT_QUEUE[MAX_EVENTS];
//-------------------------------
// WriteThreadATOMIC
//-------------------------------
void WriteThreadATOMIC(void)
{
MTX.lock();
MTX.unlock();
while( Nwritten < Nevents ){
// Copy (atomic) iwrite index to local variable and calculate index
// of next slot after it
uint32_t idx = iwrite;
uint32_t inext = (idx + 1) % MAX_EVENTS;
if(inext == ibegin){
// Queue is full
continue;
}
// At this point it looks like slot "idx" is available to write to.
// The next call ensures only one thread actually does write to it
// since the compare_exchange_weak will succeed for only one.
if(iwrite.compare_exchange_weak(idx, inext))
{
// OK, we've claimed exclusive access to the slot. We've also
// bumped the iwrite index so another writer thread can try
// writing to the next slot. Now we write to the slot.
if(EVENT_QUEUE[idx] != 0) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -1;} // Dummy check. This should NEVER happen!
EVENT_QUEUE[idx] = 1;
Nwritten++;
if(Nread>Nwritten) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -3;} // Dummy check. This should NEVER happen!
// The idx slot now contains valid data so bump the iend index to
// let reader threads know. Note: if multiple writer threads are
// in play, this may spin waiting for another to bump iend to us
// before we can bump it to the next slot.
uint32_t save_idx = idx;
while(!iend.compare_exchange_weak(idx, inext)) idx = save_idx;
}
}
lock_guard<mutex> lck(MTX);
cout << "WriteThreadATOMIC done" << endl;
}
//-------------------------------
// ReadThreadATOMIC
//-------------------------------
void ReadThreadATOMIC(void)
{
MTX.lock();
MTX.unlock();
while( Nread < Nevents ){
uint32_t idx = iread;
if(idx == iend) {
// Queue is empty
continue;
}
uint32_t inext = (idx + 1) % MAX_EVENTS;
// At this point it looks like slot "idx" is available to read from.
// The next call ensures only one thread actually does read from it
// since the compare_exchange_weak will succeed for only one.
if( iread.compare_exchange_weak(idx, inext) )
{
// Similar to above, we now have exclusive access to this slot
// for reading.
if(EVENT_QUEUE[idx] != 1) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -2;} // Dummy check. This should NEVER happen!
EVENT_QUEUE[idx] = 0;
Nread++;
if(Nread>Nwritten) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -4;} // Dummy check. This should NEVER happen!
// Bump ibegin freeing idx up for writing
uint32_t save_idx = idx;
while(!ibegin.compare_exchange_weak(idx, inext)) idx = save_idx;
}
}
lock_guard<mutex> lck(MTX);
cout << "ReadThreadATOMIC done" << endl;
}
//-------------------------------
// main
//-------------------------------
int main(int narg, char *argv[])
{
int Nwrite_threads = 4;
int Nread_threads = 4;
for(int i=0; i<MAX_EVENTS; i++) EVENT_QUEUE[i] = 0;
MTX.lock(); // Hold off threads until all are created
// Launch writer and reader threads
vector<std::thread *> atomic_threads;
for(int i=0; i<Nwrite_threads; i++){
atomic_threads.push_back( new std::thread(WriteThreadATOMIC) );
}
for(int i=0; i<Nread_threads; i++){
atomic_threads.push_back( new std::thread(ReadThreadATOMIC) );
}
// Release all threads and wait for them to finish
MTX.unlock();
while( Nread < Nevents) {
std::this_thread::sleep_for(std::chrono::microseconds(1000000));
cout << "Nwritten: " << Nwritten << " Nread: " << Nread << endl;
}
// Join threads
for(auto t : atomic_threads) t->join();
}
When I have caught this in a debugger, it is usually due to the wrong value in the EVENT_QUEUE slot. Sometimes though the Nread count exceeds the Nwritten which seems like it should be impossible. I don't think I need a fence since everything is an atomic, but I can't say at this point since I have to question everything I think I know.
Any suggestion or insight would be appreciated.