0

Recently, due to an old project not support c++11. I used gcc __sync functions to implement a lock-free multi-writer and single-reader queue.

Here is the implemention:

template<class T>
struct ConcurrentQueue {
    
    explicit ConcurrentQueue(uint32_t size)
        : m_size(size + 1)
        , m_headReady(0)
        , m_tailReady(0)
        , m_tailWriting(0) 
    {
        m_records = new T[m_size];
        assert (m_size != 0 && m_records != NULL); // uint32 overflow
    }

    ~ConcurrentQueue() {
        delete[] m_records;
    }

    // return: 
    // 0.  success
    // -1. queue full
    int Push(const T &record) {
        uint32_t tail = 0, tailNext = 0;
        do {
            tail = __sync_add_and_fetch(&m_tailWriting, 0);
            tailNext = tail + 1;
            if (tailNext == m_size) {
                tailNext = 0;
            }

            if (tailNext == __sync_add_and_fetch(&m_headReady, 0)) { // full
                return -1;
            }
        } while (!__sync_bool_compare_and_swap(&m_tailWriting, tail, tailNext));

        m_records[tail] = record;
        __sync_synchronize();
        while (!__sync_bool_compare_and_swap(&m_tailReady, tail, tailNext)) {
            sched_yield();
        }
        return 0;
    }
    
    // return:
    // 0. success
    // -1. queue empty
    int Pop(T *record) {
        if (m_headReady == __sync_add_and_fetch(&m_tailReady, 0)) return -1;
        *record = m_records[m_headReady];
        uint32_t headNext = m_headReady + 1;
        if (headNext == m_size) {
            headNext = 0;
        }
        __sync_synchronize();
        __sync_lock_test_and_set(&m_headReady, headNext);
        return 0;
    }

    // when call by reader, true value maybe bigger, because writer may be pushing value.
    // when call by writer, value is not realiable due to other writer/reader.
    uint32_t Count() {
        return ((m_tailReady - m_headReady) + m_size) % m_size;
    }

    uint32_t Capacity() {
        return m_size - 1;
    }

    bool Empty() {
        return Count() == 0;
    }

    // only writer can do this
    void Clear() {
        uint32_t tailReady = __sync_add_and_fetch(&m_tailReady, 0);
        __sync_lock_test_and_set(&m_headReady, tailReady);
    }

    public:
        const uint32_t m_size;
        volatile uint32_t m_headReady;
        volatile uint32_t m_tailReady;
        volatile uint32_t m_tailWriting;
        T *m_records;
};

And I also wrote an unit test for it. Testing with multi writer thread and a single reader thread.

ConcurrentQueue<TestType> queue(10);
int writeIntervalus = 0;
int readIntervalus = 0;
volatile int pushed = 0;
volatile int pushFailed = 0;
volatile int popped = 0;
volatile int popFailed = 0;
volatile int workingWriter = 9;

void* Reader(void *arg) {
    TestType tValue;
    while (queue.Count() > 0 || workingWriter > 0) {
        int ret = queue.Pop(&tValue);
        if (ret == 0) __sync_fetch_and_add(&popped, 1);
        else __sync_fetch_and_add(&popFailed, 1);
        if (readIntervalus != 0)
            usleep(readIntervalus);
    }
}

void* Writer(void *arg) {
    for (int i=0; i<100000; i++) {
        TestType v;
        int ret = queue.Push(v);
        if (ret == 0) {
            __sync_fetch_and_add(&pushed, 1);
        }
        else __sync_fetch_and_add(&pushFailed, 1);

        if (writeIntervalus != 0)
            usleep(writeIntervalus);
    }
}

bool MultiWriterSingleReaderTest(int writeIntervalusl, int readIntervalusl) {
    writeIntervalus = writeIntervalusl;
    readIntervalus = readIntervalusl;
    queue.Clear();
    workingWriter = 9;
    int totalWriter = workingWriter;
    pushed = popped = pushFailed = popFailed = 0;
    
    pthread_t pids[10];
    for (int i=0; i<totalWriter; i++)
        int ret = pthread_create(&pids[i], NULL, &Writer, NULL);
    
    pthread_create(&pids[totalWriter], NULL, &Reader, NULL);
    for (int i=0; i<totalWriter+1; i++) {
        pthread_join(pids[i], NULL);
        __sync_fetch_and_add(&workingWriter, -1);
    }
    if (pushed + pushFailed != 100000 * totalWriter || pushed != popped) {
        return false;
    }
    return true;
}

But later I fount that, if I set the queue size too small, like 10 or 5, the MultiWriterSingleReaderTest(0, 0) with 9 writers sometimes return false.

I've really got blocked by this case, what's wrong?

Yl6482B5
  • 21
  • 2
  • *Recently, due to an old project not support c++11* -- Any reason not to do a rewrite using C++11? You never mentioned if you are using a compiler that supports C++11, only that the project was "old". – PaulMcKenzie Oct 31 '21 at 03:46
  • In my project, This code repository is dependent by many old repositories compiled in C++03. – Yl6482B5 Oct 31 '21 at 03:53
  • Why not use `__atomic` builtins? `__sync` builtins are deprecated and only support seq_cst, not more efficient acq/rel for pure-load and pure-store operations. The `__atomic` builtins still work with `gcc -std=gnu++03` or even `c++03`. Or you could just include GCC's `include/atomic` header as `"gccatomic.h"` or something and use the `std::atomic<>` API under a different name, if that header itself works when compiled in C++03 mode. – Peter Cordes Oct 31 '21 at 04:08
  • Access to a `volatile` object is [*in practice*](https://stackoverflow.com/questions/4557979/when-to-use-volatile-with-multi-threading/58535118#58535118) more or less like `atomic` load or store with `mo_relaxed`. If that's not what you want, use `__atomic_load_n` or `__atomic_store_n`, or `std::atomic`. – Peter Cordes Oct 31 '21 at 04:11
  • Appreciate it. I'll give it a try. – Yl6482B5 Oct 31 '21 at 04:16

1 Answers1

0

I find the reason now.

if (tailNext == __sync_add_and_fetch(&m_headReady, 0)) { // full
    return -1;
}

When writer do this, and let's assume it has passed the check, but unfortunately, the writer thread lost cpu and will not be scheduled in a long time. The reader and other writers keep working.

At a moment when m_tailWriting happened to be equal to tailNext, and m_headReady happened to be m_tailWriting + 1, the writer got scheduled, as the queue-full check has been done, it will directly do CAS and try to enqueue, cause m_tailReady equals to m_headReady, cleared the queue.

Yl6482B5
  • 21
  • 2