2

I was looking at the sample code for a lock-free queue at:

http://drdobbs.com/high-performance-computing/210604448?pgno=2

(Also reference in many SO questions such as Is there a production ready lock-free queue or hash implementation in C++)

This looks like it should work for a single producer/consumer, although there are a number of typos in the code. I've updated the code to read as shown below, but it's crashing on me. Anybody have suggestions why?

In particular, should divider and last be declared as something like:

atomic<Node *> divider, last;    // shared

I don't have a compiler supporting C++0x on this machine, so perhaps that's all I need...

// Implementation from http://drdobbs.com/high-performance-computing/210604448
// Note that the code in that article (10/26/11) is broken.
// The attempted fixed version is below.

template <typename T>
class LockFreeQueue {
private:
    struct Node {
        Node( T val ) : value(val), next(0) { }
        T value;
        Node* next;
    };
    Node *first,      // for producer only
    *divider, *last;    // shared
public:
    LockFreeQueue()
    {
        first = divider = last = new Node(T()); // add dummy separator
    }
    ~LockFreeQueue()
    {
        while( first != 0 )    // release the list
        {
            Node* tmp = first;
            first = tmp->next;
            delete tmp;
        }
    }
    void Produce( const T& t )
    {
        last->next = new Node(t);    // add the new item
        last = last->next;      // publish it

        while (first != divider) // trim unused nodes
        {
            Node* tmp = first;
            first = first->next;
            delete tmp;
        }
    }
    bool Consume( T& result )
    {
        if (divider != last)         // if queue is nonempty
        {
            result = divider->next->value; // C: copy it back
            divider = divider->next;      // D: publish that we took it
            return true;                  // and report success
        }
        return false;                   // else report empty
    }
};

I wrote the following code to test this. Main (not shown) just calls TestQ().

#include "LockFreeQueue.h"

const int numThreads = 1;
std::vector<LockFreeQueue<int> > q(numThreads);

void *Solver(void *whichID)
{
    int id = (long)whichID;
    printf("Thread %d initialized\n", id);
    int result = 0;
    do {
        if (q[id].Consume(result))
        {
            int y = 0;
            for (int x = 0; x < result; x++)
            { y++; }
            y = 0;
        }
    } while (result != -1);
    return 0;
}


void TestQ()
{
    std::vector<pthread_t> threads;
    for (int x = 0; x < numThreads; x++)
    {
        pthread_t thread;
        pthread_create(&thread, NULL, Solver, (void *)x);
        threads.push_back(thread);
    }
    for (int y = 0; y < 1000000; y++)
    {
        for (unsigned int x = 0; x < threads.size(); x++)
        {
            q[x].Produce(y);
        }
    }
    for (unsigned int x = 0; x < threads.size(); x++)
    {
        q[x].Produce(-1);
    }
    for (unsigned int x = 0; x < threads.size(); x++)
        pthread_join(threads[x], 0);    
}

Update: It ends up that the crash is being caused by the queue declaration:

std::vector<LockFreeQueue<int> > q(numThreads);

When I change this to be a simple array, it runs fine. (I implemented a version with locks and it was crashing too.) I see that the destructor is being called immediate after the constructor, resulting in doubly-freed memory. But, does anyone know WHY the destructor would be called immediately with a std::vector?

Community
  • 1
  • 1
Nathan S.
  • 5,244
  • 3
  • 45
  • 55
  • @OliCharlesworth "malloc: *** error for object 0x100100a20: pointer being freed was not allocated" on the line "delete tmp;" – Nathan S. Oct 26 '11 at 23:04
  • Another simple method to implement a _single_ reader\_single_ writer queue I've implemented is using 2 queues and an atomic 'swap buffer': queue `R` for reading, queue `W` for writing and the 'swap buffer` `S` that is initially set to null. Usage: after the producer is done writing: exchange `W <-> S`. The consumer reads from `R` as usual, exchanging `R <-> S` if `R` is exhausted; trying one more read after the swap. Lastly, if a reader (or writer) encounters a nullptr _before_ reading (or writing) then exchange `R <-> S` (or `W <-> S`)... – pauluss86 Jan 30 '14 at 15:57
  • ... The advantage is simplicity; the disadvantage is that a consumer could repeatedly exchange empty queues if nothing was written between reads. In my implementation, I've implemented a `listen()` method that blocks the reader until it's notified by the writer. – pauluss86 Jan 30 '14 at 16:03

3 Answers3

1

You'll need to make several of the pointers std::atomic, as you note, and you'll need to use compare_exchange_weak in a loop to update them atomically. Otherwise, multiple consumers might consume the same node and multiple producers might corrupt the list.

Chris Dodd
  • 119,907
  • 13
  • 134
  • 226
  • I'm only interested in the single producer/consumer case, but I will have to test the atomic code on another machine to verify it works. – Nathan S. Oct 26 '11 at 23:25
1

It's critically important that these writes (just one example from your code) occur in order:

last->next = new Node(t);    // add the new item
last = last->next;      // publish it

That's not guaranteed by C++ -- the optimizer can rearrange things however it likes, as long as the current thread always acts as-if the program ran exactly the way you wrote it. And then the CPU cache can come along and reorder things further.

You need memory fences. Making the pointers use the atomic type should have that effect.

Ben Voigt
  • 277,958
  • 43
  • 419
  • 720
  • why will the compiler reorder these? As in, how does the compiler justify the reordering? – user1715122 Dec 04 '13 at 21:28
  • 1
    @user1715122: It's justified by the C++ memory model. Prior to that, it was justified by sequential consistency. Changes to non-volatile memory are side-effects that are only visible to the current thread, as far as the compiler is concerned. So it can write `temp1 = new Node(t); temp2 = last; last = temp1; temp2->next = temp1;` And even if the compiler doesn't, the CPU itself can take and release ownership of different cache lines out of order, as long as it owns the cache line when the write actually occurs. – Ben Voigt Dec 04 '13 at 22:20
  • Or the compiler could have a register caching `last`, and only read and write that register and never touch the memory location at all. Any optimization is justified by the as-if rule, unless it changes the visible side effects. There are no visible side effects here. – Ben Voigt Dec 04 '13 at 22:22
0

This could be totally off the mark, but I can't help but wonder whether you're having some sort of static initialization related issue... For laughs, try declaring q as a pointer to a vector of lock-free queues and allocating it on the heap in main().

mergeconflict
  • 8,156
  • 34
  • 63