0

I was trying to implement a master-worker model using the C++ 11 synchronization features for practice. The model uses a std::queue object along with a condition variable and some mutexes. The master thread puts tasks in the queue and the worker threads pops a task off the queue and "processes" them.

The code I have works properly (unless I've missed some race conditions) when I don't terminate the worker threads. However, the program never ends until you manually terminate it with Ctrl+C. I have some code to terminate the workers after the master thread finishes. Unfortunately, this doesn't work properly as it skips the last task on some execution runs.

So my question: Is it possible to safely and properly terminate worker threads after all tasks have been processed?

This was just a proof of concept and I'm new to C++ 11 features so I apologize for my style. I appreciate any constructive criticism.

EDIT: nogard has kindly pointed out that this implementation of the model makes it quite complicated and showed me that what I'm asking for is pointless since a good implementation will not have this problem. Thread pools are the way to go in order to implement this properly. Also, I should be using an std::atomic instead of a normal boolean for worker_done (Thanks Jarod42).

#include <iostream>
#include <sstream>
#include <string>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>

//To sleep
#include <unistd.h>

struct Task
{
    int taskID;
};
typedef struct Task task;


//cout mutex
std::mutex printstream_accessor;

//queue related objects
std::queue<task> taskList;
std::mutex queue_accessor;
std::condition_variable cv;

//worker flag
bool worker_done = false;

//It is acceptable to call this on a lock only if you poll - you will get an inaccurate answer otherwise
//Will return true if the queue is empty, false if not
bool task_delegation_eligible()
{
    return taskList.empty();
}

//Thread safe cout function
void safe_cout(std::string input)
{
    // Apply a stream lock and state the calling thread information then print the input
    std::unique_lock<std::mutex> cout_lock(printstream_accessor);   
    std::cout << "Thread:" << std::this_thread::get_id() << " " << input << std::endl;
}//cout_lock destroyed, therefore printstream_accessor mutex is unlocked

void worker_thread()
{
    safe_cout("worker_thread() initialized");
    while (!worker_done)
    {
        task getTask;
        {
            std::unique_lock<std::mutex> q_lock(queue_accessor);
            cv.wait(q_lock, 
            []
                {   //predicate that will check if available
                    //using a lambda function to apply the ! operator
                    if (worker_done)
                        return true;

                    return !task_delegation_eligible();
                }
            );

            if (!worker_done)
            {
                //Remove task from the queue
                getTask = taskList.front();
                taskList.pop();
            }
        }

        if (!worker_done)
        {
            //process task
            std::string statement = "Processing TaskID:";
            std::stringstream convert;
            convert << getTask.taskID;
            statement += convert.str();

            //print task information
            safe_cout(statement);

            //"process" task
            usleep(5000);
        } 
    }
}

/**
 * master_thread(): 
 * This thread is responsible for creating task objects and pushing them onto the queue
 * After this, it will notify all other threads who are waiting to consume data
 */
void master_thread()
{
    safe_cout("master_thread() initialized");

    for (int i = 0; i < 10; i++)
    {
        //Following 2 lines needed if you want to don't want this thread to bombard the queue with tasks before processing of a task can be done
        while (!task_delegation_eligible() )    //task_eligible() is true IFF queue is empty
            std::this_thread::yield();          //yield execution to other threads (if there are tasks on the queue)

        //create a new task
        task newTask;
        newTask.taskID = (i+1);

        //lock the queue then push
        {
            std::unique_lock<std::mutex> q_lock(queue_accessor);
            taskList.push(newTask);
        }//unique_lock destroyed here

        cv.notify_one();
    }

    safe_cout("master_thread() complete");
}

int main(void)
{
    std::thread MASTER_THREAD(master_thread);   //create a thread object named MASTER_THREAD and have it run the function master_thread()
    std::thread WORKER_THREAD_1(worker_thread);
    std::thread WORKER_THREAD_2(worker_thread);
    std::thread WORKER_THREAD_3(worker_thread);

    MASTER_THREAD.join();

    //wait for the queue tasks to finish
    while (!task_delegation_eligible());    //wait if the queue is full

    /** 
     * Following 2 lines
     * Terminate worker threads => this doesn't work as expected.
     * The model is fine as long as you don't try to stop the worker 
     * threads like this as it might skip a task, however this program
     * will terminate
     */
    worker_done = true;
    cv.notify_all();

    WORKER_THREAD_1.join();
    WORKER_THREAD_2.join();
    WORKER_THREAD_3.join();

    return 0;
}

Thanks a lot

Cal
  • 734
  • 1
  • 9
  • 25
  • 2
    `worker_done` should be a `std::atomic`, and you may replace `usleep` by `std::this_thread::sleep_for`. – Jarod42 Jul 09 '14 at 07:40
  • Thank you Jarod, I will change worker_done to be of type std::atomic and replace usleep with the proper threaded implementation of the sleep call – Cal Jul 09 '14 at 13:51

1 Answers1

1

There is visibility issue in your program: the change of worker_done flag made in one thread might not be observed by worker thread. In order to guarantee that the results of one action are observable to a second action, then you have to use some form of synchronization to make sure that the second thread sees what the first thread did. To fix this issue you can use atomic as proposed by Jarod42.

If you do this program for practicing it's fine, but for the real applications you could profit from existing thread pool, which would greatly simplify your code.

Community
  • 1
  • 1
nogard
  • 9,432
  • 6
  • 33
  • 53
  • Thank you very much nogard. I will implement the thread pool as you recommended. I really appreciate your advice – Cal Jul 09 '14 at 13:50