0

I have this very simple and small C++ program that creates a thread pool, then put messages in a blocking queue shared between threads to say to each thread what to do.

Message can be: -1 (end of stream -> terminate), -2 (barrier -> wait for all threads to reach it, then continue), other values to do random computation. The loop is done in this order: some computation, barrier, some computation, barrier, ..., barrier, end of stream, thread join, exit.

I'm not able to understand why I obtain deadlock even with 2 threads in the pool. The queue is not able to become empty, but the order in which I push and pop messages would always lead to an empty queue!

The blocking queue implementation is the one proposed here (C++ Equivalent to Java's BlockingQueue) with just two methods added. I copy also the queue code below.

Any help?

Main.cpp

#include <iostream>
#include <vector>
#include <thread>
#include "Queue.hpp"

using namespace std;

// function executed by each thread
void f(int i, Queue<int> &q){
    while(1){
        // take a message from blocking queue
        int j= q.pop();
        // if it is end of stream then exit
        if (j==-1) break;
        // if it is barrier, wait for other threads to reach it
        if (j==-2){
            // active wait! BAD, but anyway...
            while(q.size() > 0){
                ;
            }
        }
        else{
            // random stuff
            int x = 0;
            for(int i=0;i<j;i++)
                x += 4;
        }
    }
}

int main(){
    Queue<int> queue; //blocking queue
    vector<thread> tids; // thread pool
    int nt = 2; // number of threads
    int dim = 8; // number to control number of operations

    // create thread pool, passing thread id and queue
    for(int i=0;i<nt;i++)
        tids.push_back(thread(f,i, std::ref(queue)));

    for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine

        // push random number
        for(int j=0;j<dist;j++){    
            queue.push(4);  
        }

        // push barrier code
        for(int i=0;i<nt;i++){
            queue.push(-2);
        }

        // active wait! BAD, but anyway...
        while (queue.size()>0){
                 ;
        }
    }
    // push end of stream
    for(int i=0;i<nt;i++)
        queue.push(-1);
    // join thread pool
    for(int i=0;i<nt;i++){
        tids[i].join();
    }           
return 0;
}

Queue.hpp

#include <deque>
#include <mutex>
#include <condition_variable>

template <typename T>
class Queue
{
private:
  std::mutex              d_mutex;
  std::condition_variable d_condition;
  std::deque<T>           d_queue;
public:

  void push(T const& value) {
    {
      std::unique_lock<std::mutex> lock(this->d_mutex);
      d_queue.push_front(value);
    }
    this->d_condition.notify_one();
  }

  T pop() {
    std::unique_lock<std::mutex> lock(this->d_mutex);
    this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
    T rc(std::move(this->d_queue.back()));
    this->d_queue.pop_back();
    return rc;
  }

  bool empty(){
      std::unique_lock<std::mutex> lock(this->d_mutex); 
      return this->d_queue.empty(); 
  }

  int size(){
    std::unique_lock<std::mutex> lock(this->d_mutex); 
    return this->d_queue.size();
  }
};
Bobby
  • 187
  • 1
  • 11

2 Answers2

0

I think the problem is your active wait that you describe as "BAD, but anyway..." and using the size of the queue as a barrier instead of using a true synchronization barrier

For dim =1 you push a Queue that has 4, -2, -2. One thread will grab the 4 and -2 while the other grabs the remaining -2. At this point the queue is empty and you have three threads (the two workers and main thread) doing an active wait racing to see if the queue has been emptied. There is a mutex on size that only lets one read the size at a time. If the main thread is scheduled first and determines that queue is empty it will push on -1, -1 to signal end of stream. Now, the queue is no longer empty, but one or both of the two worker threads are waiting for it to empty. Since they are waiting for it to be empty before taking another item the queue is deadlocked in this state.

For the case were dim > 1 there is likely a similar issue with pushing the next set of values into the queue on the main thread before both workings acknowledge the empty the queue and exit the active wait.

user1593858
  • 649
  • 7
  • 12
  • Thank you for your explanation, I now better understand the problem. As @KorelK pointed out the problem is not in the active wait but in the main thread pushing the barrier number (-2). I will consider to use pthread synchronization as you suggested, but in this case I just wanted to implement from scratch such mechanism. – Bobby Jun 02 '18 at 07:57
  • Removing the -2 doesn't really solve your deadlock issue it simply avoids the problem altogether – user1593858 Jun 02 '18 at 15:32
-1

I had run your code and I understand the problem. The problem is with "-2" option. When the two threads arrive to this point, your main thread already pushed another values to the queue. So, if your queue increased it's size between the time that your threads got "-2" value, and before they arrive to "-2" option, your code will stuck: Thread 1: get -2. Thread 2: get -2. Thread main: push -1. Thread main: push -1. Thread 1: wait untill the whole queue will be empty. Thread 2: wait untill the whole queue will be empty.

queue: -1 -1

^ this in case that dim equals 1. In your code, dim equals 8, you don't want to see how it looks like.. To solve this, all I did was to disable the following loop:

for(int i=0;i<nt;i++){
    queue.push(-2);
}

When this pard disable, the code run perfectly. This is how I checked it:

std::mutex guarder;

// function executed by each thread
void f(int i, Queue<int> &q){
    while(1){
        // take a message from blocking queue
        guarder.lock();
        int j= q.pop();
        guarder.unlock();
        // if it is end of stream then exit
        if (j==-1) break;
        // if it is barrier, wait for other threads to reach it
        if (j==-2){
            // active wait! BAD, but anyway...
            while(q.size() > 0){
                ;
            }
        }
        else{
            // random stuff
            int x = 0;
            for(int i=0;i<j;i++)
                x += 4;
            guarder.lock();
            cout << x << std::endl;
            guarder.unlock();
        }
    }
}

int main(){
    Queue<int> queue; //blocking queue
    vector<thread> tids; // thread pool
    int nt = 2; // number of threads
    int dim = 8; // number to control number of operations

    // create thread pool, passing thread id and queue
    for(int i=0;i<nt;i++)
        tids.push_back(thread(f,i, std::ref(queue)));

    for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine

        // push random number
        for(int j=0;j<dist;j++){
            queue.push(dist);
        }

        /*// push barrier code
        for(int i=0;i<nt;i++){
            queue.push(-2);
        }*/

        // active wait! BAD, but anyway...
        while (queue.size()>0){
            ;
        }
    }
    // push end of stream
    for(int i=0;i<nt;i++)
        queue.push(-1);
    // join thread pool
    for(int i=0;i<nt;i++){
        tids[i].join();
    }
    return 0;
}

The result:

4
8
8
12
12
12
16
16
16
20
20
16
20
20
20
24
24
24
24
24
24
28
28
28
28
28
28
28
32
32
32
32
32
32
32
32

BTW, the stuck didn't occur because your "active wait" part. It is not good, but it cause other problems usually (like slowing down your system).

Coral Kashri
  • 3,436
  • 2
  • 10
  • 22
  • You are totally right, your fix works perfectly. Active wait was just to make code simpler, I plan to use condition variable. Thank you! – Bobby Jun 02 '18 at 08:00
  • This defeats the purpose of the code as described by the OP. The point of the -2 was to synchronize threads and make them all reach the barrier before continuing. By removing the push of -2 you allow the threads to run unimpeded and nothing stops one thread from starting on the next dimension before the other is done with the current dimension. Active wait in and of itself was not the problem. The problem was in using the size of the queue as the exit condition – user1593858 Jun 02 '18 at 15:28
  • The target was to solve the problem, and this is exactly what I did. I didn't suggest another algorithm to make "-2" work. You can specify the thread index that you want it to make complete the mission, or you can to make a test if all the threads are in "-2" position, so you'll know to pass all of the threads to the next mission. – Coral Kashri Jun 02 '18 at 16:24