Recently, due to an old project not support c++11. I used gcc __sync functions to implement a lock-free multi-writer and single-reader queue.
Here is the implemention:
template<class T>
struct ConcurrentQueue {
explicit ConcurrentQueue(uint32_t size)
: m_size(size + 1)
, m_headReady(0)
, m_tailReady(0)
, m_tailWriting(0)
{
m_records = new T[m_size];
assert (m_size != 0 && m_records != NULL); // uint32 overflow
}
~ConcurrentQueue() {
delete[] m_records;
}
// return:
// 0. success
// -1. queue full
int Push(const T &record) {
uint32_t tail = 0, tailNext = 0;
do {
tail = __sync_add_and_fetch(&m_tailWriting, 0);
tailNext = tail + 1;
if (tailNext == m_size) {
tailNext = 0;
}
if (tailNext == __sync_add_and_fetch(&m_headReady, 0)) { // full
return -1;
}
} while (!__sync_bool_compare_and_swap(&m_tailWriting, tail, tailNext));
m_records[tail] = record;
__sync_synchronize();
while (!__sync_bool_compare_and_swap(&m_tailReady, tail, tailNext)) {
sched_yield();
}
return 0;
}
// return:
// 0. success
// -1. queue empty
int Pop(T *record) {
if (m_headReady == __sync_add_and_fetch(&m_tailReady, 0)) return -1;
*record = m_records[m_headReady];
uint32_t headNext = m_headReady + 1;
if (headNext == m_size) {
headNext = 0;
}
__sync_synchronize();
__sync_lock_test_and_set(&m_headReady, headNext);
return 0;
}
// when call by reader, true value maybe bigger, because writer may be pushing value.
// when call by writer, value is not realiable due to other writer/reader.
uint32_t Count() {
return ((m_tailReady - m_headReady) + m_size) % m_size;
}
uint32_t Capacity() {
return m_size - 1;
}
bool Empty() {
return Count() == 0;
}
// only writer can do this
void Clear() {
uint32_t tailReady = __sync_add_and_fetch(&m_tailReady, 0);
__sync_lock_test_and_set(&m_headReady, tailReady);
}
public:
const uint32_t m_size;
volatile uint32_t m_headReady;
volatile uint32_t m_tailReady;
volatile uint32_t m_tailWriting;
T *m_records;
};
And I also wrote an unit test for it. Testing with multi writer thread and a single reader thread.
ConcurrentQueue<TestType> queue(10);
int writeIntervalus = 0;
int readIntervalus = 0;
volatile int pushed = 0;
volatile int pushFailed = 0;
volatile int popped = 0;
volatile int popFailed = 0;
volatile int workingWriter = 9;
void* Reader(void *arg) {
TestType tValue;
while (queue.Count() > 0 || workingWriter > 0) {
int ret = queue.Pop(&tValue);
if (ret == 0) __sync_fetch_and_add(&popped, 1);
else __sync_fetch_and_add(&popFailed, 1);
if (readIntervalus != 0)
usleep(readIntervalus);
}
}
void* Writer(void *arg) {
for (int i=0; i<100000; i++) {
TestType v;
int ret = queue.Push(v);
if (ret == 0) {
__sync_fetch_and_add(&pushed, 1);
}
else __sync_fetch_and_add(&pushFailed, 1);
if (writeIntervalus != 0)
usleep(writeIntervalus);
}
}
bool MultiWriterSingleReaderTest(int writeIntervalusl, int readIntervalusl) {
writeIntervalus = writeIntervalusl;
readIntervalus = readIntervalusl;
queue.Clear();
workingWriter = 9;
int totalWriter = workingWriter;
pushed = popped = pushFailed = popFailed = 0;
pthread_t pids[10];
for (int i=0; i<totalWriter; i++)
int ret = pthread_create(&pids[i], NULL, &Writer, NULL);
pthread_create(&pids[totalWriter], NULL, &Reader, NULL);
for (int i=0; i<totalWriter+1; i++) {
pthread_join(pids[i], NULL);
__sync_fetch_and_add(&workingWriter, -1);
}
if (pushed + pushFailed != 100000 * totalWriter || pushed != popped) {
return false;
}
return true;
}
But later I fount that, if I set the queue size too small, like 10 or 5, the MultiWriterSingleReaderTest(0, 0) with 9 writers sometimes return false.
I've really got blocked by this case, what's wrong?