2

I am learning OpenMP parallel processing library in C++. I felt that I got the basic concepts, and try to test my knowledge by implementing a linked list queue. I wanted to consume the queue from multiple threads.

The challenge here is that not to consume the same node twice. So I was considering sharing the queue between threads but allowing only a single thread to update(go to the next node in the queue) it at a time. For this purpose, I could use critical or lock. However, without using them; somehow, it seems to be working perfectly. No race-condition has occurred.

#include <iostream>
#include <omp.h>
#include <zconf.h>


struct Node {
    int data;
    struct Node* next = NULL;
    Node() {}
    Node(int data) {
        this->data = data;
    }
    Node(int data, Node* node) {
        this->data = data;
        this->next = node;
    }
};

void processNode(Node *pNode);

struct Queue {
    Node *head = NULL, *tail = NULL;

    Queue& add(int data) {
        add(new Node(data));
        return *this;
    }

    void add(Node *node) {
        if (head == NULL) {
            head = node;
            tail = node;
        } else {
            tail->next = node;
            tail = node;
        }
    }

    Node* remove() {
        Node *node;
            node = head;
        if (head != NULL)
            head = head->next;

        return node;
    }

};

int main() {
    srand(12);
    Queue queue;
    for (int i = 0; i < 6; ++i) {
        queue.add(i);
    }

    double timer_started = omp_get_wtime();
    omp_set_num_threads(3);
    #pragma omp parallel
    {
        Node *n;
        while ((n = queue.remove()) != NULL) {
            double started = omp_get_wtime();
            processNode(n);
            double elapsed = omp_get_wtime() - started;
            printf("Thread id: %d data: %d, took: %f \n", omp_get_thread_num(), n->data, elapsed);
        }
    }
    double elapsed = omp_get_wtime() - timer_started;

    std::cout << "end. took " << elapsed << " in total " << std::endl;
    return 0;
}

void processNode(Node *node) {
    int r = rand() % 3 + 1; // between 1 and 3
    sleep(r);
}

Output looks like this:

Thread id: 0 data: 0, took: 1.000136 
Thread id: 2 data: 2, took: 1.000127 
Thread id: 2 data: 4, took: 1.000208 
Thread id: 1 data: 1, took: 3.001371 
Thread id: 0 data: 3, took: 2.001041 
Thread id: 2 data: 5, took: 2.004960 
end. took 4.00583 in total 

I've run this with a different number of threads and many times. But, I couldn't get any race condition or something wrong. I was thinking it was possible for two different threads to invoke 'remove' and process a single node twice. But it did not happen. Why?

https://github.com/muatik/openmp-examples/blob/master/linkedlist/main.cpp

Muatik
  • 4,011
  • 10
  • 39
  • 72
  • What are you expecting as a result? You don't seem to be modifying/writing back to a variable during the processing of the queue. If you don't modify anything, then you _should_ get the same result each time (except for the order in which threads activate - you can't control or predict that). If you perform some sort of operation which depends on the order the nodes are accessed, then you may experience different results. Or if you don't use any locks but every thread writes to the same variable, you could potentially get race conditions. I can' think of any examples right now sorry. – Jarak Mar 11 '17 at 20:25
  • I think that the threads use the queue shared between threads. They call `remove` method of the queue and queue changes its head pointer to the next node. I am asking, is it possible for two threads to call the remove() at the same time and receive the same node? – Muatik Mar 11 '17 at 20:38
  • 1
    Yes it is possible. There is nothing to prevent both threads from completing `node = head;` before one can finish removing it. You have gotten lucky, or unlucky depending on how you look at it, so far. Try with a much longer queue. – user4581301 Mar 11 '17 at 21:02
  • then what is the right way to prevent it? should I make sure that only a single thread calls `remote` at a time by using, for instance, locking? – Muatik Mar 11 '17 at 21:06
  • 1
    You could just split your data and provide separate lists for every thread. Concurrent calls to `remove` won't be a problem then. – Tomas Dittmann Mar 11 '17 at 21:56
  • 1
    Easiest would be a `std::mutex` member for the queue that is [locked with a `std::scoped_lock`](http://en.cppreference.com/w/cpp/thread/scoped_lock) at the top of the `remove` function. Maybe not most efficient, but easy. – user4581301 Mar 12 '17 at 00:58

1 Answers1

2

First and foremost, you can never prove multi-threaded code to be correct through testing. Your hunch, that you need a lock / critical section is correct.

Your test is particularly easy on the queue. The following breaks your queue quickly:

for (int i = 0; i < 10000; ++i) {
    queue.add(i);
}

double timer_started = omp_get_wtime();
#pragma omp parallel
{
    size_t counter = 0;
    Node *n;
    while ((n = queue.remove()) != NULL) {
        processNode(n);
        counter++;
    }

    #pragma omp critical
    std::cout << "Thread " << omp_get_thread_num() << " processed " << counter << " nodes." << std::endl;
}

void processNode(Node *node) {}

Show for example the following interesting result:

Thread 1 processed 11133 nodes.
Thread 0 processed 9039 nodes.

But again, if you made a queue that runs a million times correctly with this test code, doesn't mean the queue is implemented correctly.

In particular, it is not sufficient to just protect remove, you must properly protect each and every read and write to the queue data. To get an idea of the difficulty to get this right, watch this excellent talk by Herb Sutter.

Generally, I recommend to use an existing parallel data structure, for example from Boost.Lockfree.

However, unfortunately OpenMP and C++11 lock / atomic primitives don't officially play well together. So strictly speaking, if you use OpenMP, you should stick to OpenMP synchronization primitives or libraries that use them.

Community
  • 1
  • 1
Zulan
  • 21,896
  • 6
  • 49
  • 109
  • thanks for your informative answer. yes, it is broken in your code; and this was what I wanted. so; if I modify my `remove()` method like `Node* remove() { Node *node; #pragma omp critical { node = head; if (head != NULL) head = head->next; } return node; }` then do the consumer threads no longer process the same node? – Muatik Mar 12 '17 at 18:34
  • That works, but only if you don't add anything concurrently. In which case there wouldn't be much of a point for that kind of queue as opposed to a `std::vector`. – Zulan Mar 12 '17 at 18:43