3

I was trying to write code for Producer-Consumer problem. Below code works fine most of the time but stuck sometimes because of "Lost Wake-up" (i guess). I tried thread sleep() but it didn't work. What modification is needed to handle this case in my code? Is semaphore can be helpful here ? If yes, how will i implement them here ?

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <iostream>

using namespace std;

int product = 0;
boost::mutex mutex;
boost::condition_variable cv;
boost::condition_variable pv;
bool done = false;

void consumer(){
    while(done==false){
        //cout << "start c" << endl
        boost::mutex::scoped_lock lock(mutex);
        cv.wait(lock);
        //cout << "wakeup c" << endl;
        if (done==false)
        {
            cout << product << endl;
            //cout << "notify c" << endl;
            pv.notify_one();
        }
        //cout << "end c" << endl;
    }
}

void producer(){
    for(int i=0;i<10;i++){
        //cout << "start p" << endl;
        boost::mutex::scoped_lock lock(mutex);
        boost::this_thread::sleep(boost::posix_time::microseconds(50000));
        ++product;
        //cout << "notify p" << endl;
        cv.notify_one();
        pv.wait(lock);
        //cout << "wakeup p" << endl;
    }
    //cout << "end p" << endl;
    cv.notify_one();
    done = true;
}

int main()
{
    int t = 1000;
    while(t--){
        /*
        This is not perfect, and is prone to a subtle issue called the lost wakeup (for example, producer calls notify() 
        on the condition, but client hasn't really called wait() yet, then both will wait() indefinitely.) 
        */
        boost::thread consumerThread(&consumer);    
        boost::thread producerThread(&producer);

        producerThread.join();
        consumerThread.join();
        done =false;
        //cout << "process end" << endl;
    }
    cout << "done" << endl;
    getchar();
    return 0;
}
UmNyobe
  • 22,539
  • 9
  • 61
  • 90
Nishant Kumar
  • 2,199
  • 2
  • 22
  • 43
  • check my answer on how to deal with condition variables: http://stackoverflow.com/a/5538447/104774, they should not be used as signals, but just to notify that a condition has changed. – stefaanv Dec 02 '13 at 10:01

2 Answers2

1

Yes, you want a way to know (in the consumer) that you "missed" a signal. A semaphore can help. There's more than one way to skin a cat, so here's my simple take on it (using just c++11 standard library features):

class semaphore
{
private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;

public: 
    semaphore(int count_ = 0) : count(count_) { }

    void notify()
    {
        std::unique_lock<std::mutex> lck(mtx);
        ++count;
        cv.notify_one();
    }

    void wait() { return wait([]{}); }  // no-op action

    template <typename F>
    auto wait(F&& func = []{}) -> decltype(std::declval<F>()())
    {
        std::unique_lock<std::mutex> lck(mtx);

        while(count == 0){
            cv.wait(lck);
        }
        count--;

        return func();
    }
};

For convenience, I added a convenience wait() overload that takes a function to be executed under the lock. This makes it possible for the consumer to operate the 'semaphore' without ever manually operating the lock (and still get the value of product without data-races):

semaphore sem;

void consumer() {
    do {
        bool stop = false;
        int received_product = sem.wait([&stop] { stop = done; return product; });

        if (stop)
            break;

        std::cout << received_product << std::endl;

        std::unique_lock<std::mutex> lock(processed_mutex);
        processed_signal.notify_one();
    } while(true);
}

A fully working demo: Live on Coliru:

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <cassert>

class semaphore
{
private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;

public: 
    semaphore(int count_ = 0) : count(count_) { }

    void notify()
    {
        std::unique_lock<std::mutex> lck(mtx);
        ++count;
        cv.notify_one();
    }

    void wait() { return wait([]{}); }  // no-op action

    template <typename F>
    auto wait(F&& func = []{}) -> decltype(std::declval<F>()())
    {
        std::unique_lock<std::mutex> lck(mtx);

        while(count == 0){
            cv.wait(lck);
        }
        count--;

        return func();
    }
};

semaphore sem;

int product = 0;
std::mutex processed_mutex;
std::condition_variable processed_signal;

bool done = false;

void consumer(int check) {
    do {
        bool stop = false;
        int received_product = sem.wait([&stop] { stop = done; return product; });

        if (stop)
            break;

        std::cout << received_product << std::endl;
        assert(++check == received_product);

        std::unique_lock<std::mutex> lock(processed_mutex);
        processed_signal.notify_one();
    } while(true);
}

void producer() {
    std::unique_lock<std::mutex> lock(processed_mutex);
    for(int i = 0; i < 10; ++i) {
        ++product;
        sem.notify();
        processed_signal.wait(lock);
    }
    done = true;
    sem.notify();
}

int main() {
    int t = 1000;
    while(t--) {
        std::thread consumerThread(&consumer, product);
        std::thread producerThread(&producer);
        producerThread.join();
        consumerThread.join();
        done = false;
        std::cout << "process end" << std::endl;
    }
    std::cout << "done" << std::endl;
}
sehe
  • 374,641
  • 47
  • 450
  • 633
  • the processed_signal notification can still get lost – stefaanv Dec 02 '13 at 12:01
  • @stefaanv I'd appreciate if you demonstrated how. I think the `processed_mutex` amply protects that. The consumer can't possibly signal the `processed_signal` (a slight misnomer, btw :)) until it gets the mutex: hence, by definition the producer is waiting on the condition variable first. – sehe Dec 02 '13 at 12:14
  • you're right, although this mainly works because of the lock around the complete producer thread which in most cases beats the purpose of having threads, but here it is okay. I didn't spot that. – stefaanv Dec 02 '13 at 13:46
  • @stefaanv Actually, that lock is _logically_ not a lock here. It's a synchronization primitive. Think of it as a "negative critical-section"; it is 'opened up' while waiting for the condition variable. Sure it could be tuned, but that's for the OP to decide – sehe Dec 02 '13 at 13:49
-1

You seems to ignore that the variable done is also a shared state, to the same extend as product. Which can lead to several races conditions. In your case, I see at least one scenario where consumerThread make no progress:

  1. The loop execute has intended
  2. consumer executes, and is waiting at cv.wait(lock);
  3. producer has finished the for loop, and notify consumer and is preempted
  4. consumer wakes up, read "done==false", output product, read done == false again, wait on the condition
  5. producer set done to true and exit
  6. consumer is stuck forever

To avoid these kind of issues you should be holding a lock when reading or writing done. Btw your implementation is quite sequential, ie the producer and the consumer can only process a single piece of data at the time...

UmNyobe
  • 22,539
  • 9
  • 61
  • 90
  • even if you move all reads/write to `done` inside the critical section (trivial: [just move the `scoped_lock`s outside the loop](http://coliru.stacked-crooked.com/a/b330b0ee81c3e6ea)) the dead lock still can happen – sehe Dec 02 '13 at 10:14