13

In a distributed job system written in C++11 I have implemented a fence (i.e. a thread outside the worker thread pool may ask to block until all currently scheduled jobs are done) using the following structure:

struct fence
{
    std::atomic<size_t>                     counter;
    std::mutex                              resume_mutex;
    std::condition_variable                 resume;

    fence(size_t num_threads)
        : counter(num_threads)
    {}
};

The code implementing the fence looks like this:

void task_pool::fence_impl(void *arg)
{
    auto f = (fence *)arg;
    if (--f->counter == 0)      // (1)
        // we have zeroed this fence's counter, wake up everyone that waits
        f->resume.notify_all(); // (2)
    else
    {
        unique_lock<mutex> lock(f->resume_mutex);
        f->resume.wait(lock);   // (3)
    }
}

This works very well if threads enter the fence over a period of time. However, if they try to do it almost simultaneously, it seems to sometimes happen that between the atomic decrementation (1) and starting the wait on the conditional var (3), the thread yields CPU time and another thread decrements the counter to zero (1) and fires the cond. var (2). This results in the previous thread waiting forever in (3), because it starts waiting on it after it has already been notified.

A hack to make the thing workable is to put a 10 ms sleep just before (2), but that's unacceptable for obvious reasons.

Any suggestions on how to fix this in a performant way?

IneQuation
  • 1,244
  • 11
  • 27

2 Answers2

14

Your diagnose is correct, this code is prone to lose condition notifications in the way you described. I.e. after one thread locked the mutex but before waiting on the condition variable another thread may call notify_all() so that the first thread misses that notification.

A simple fix is to lock the mutex before decrementing the counter and while notifying:

void task_pool::fence_impl(void *arg)
{
    auto f = static_cast<fence*>(arg);
    std::unique_lock<std::mutex> lock(f->resume_mutex);
    if (--f->counter == 0) {
        f->resume.notify_all();
    }
    else do {
        f->resume.wait(lock);
    } while(f->counter);
}

In this case the counter need not be atomic.

An added bonus (or penalty, depending on the point of view) of locking the mutex before notifying is (from here):

The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().

Regarding the while loop (from here):

Spurious wakeups from the pthread_cond_timedwait() or pthread_cond_wait() functions may occur. Since the return from pthread_cond_timedwait() or pthread_cond_wait() does not imply anything about the value of this predicate, the predicate should be re-evaluated upon such return.

Maxim Egorushkin
  • 131,725
  • 17
  • 180
  • 271
  • `unique_lock` is missing its required template argument: `unique_lock`, both in the question and the answer. But otherwise I agree, +1. – Howard Hinnant Jan 07 '14 at 23:07
  • Oh, there's a `typedef` inside the class (`fence` is a nested structure), but I've added the template argument for readability, thanks. – IneQuation Jan 08 '14 at 05:56
  • @Maxim Yegorushkin, why did you add a `while(f->counter)` statement to that last block? – IneQuation Jan 08 '14 at 05:59
  • 1
    @IneQuation I added a note regarding the while loop for you. – Maxim Egorushkin Jan 08 '14 at 09:38
  • Okay, so that deals with the pthread implementation on *nix. Is this also needed on Windows? I know it won't hurt, I'm now asking for the necessity of this. – IneQuation Jan 08 '14 at 09:45
  • 1
    @IneQuation It is required in portable C++ code, see http://en.cppreference.com/w/cpp/thread/condition_variable/wait Not sure how condition_variable is implemented on Windows, you can probably look it up in Boost condition variable sources. – Maxim Egorushkin Jan 08 '14 at 09:47
-1

In order to keep the higher performance of an atomic operation instead of a full mutex, you should change the wait condition into a lock, check and loop.

All condition waits should be done in that way. The condition variable even has a 2nd argument to wait which is a predicate function or lambda.

The code might look like:

void task_pool::fence_impl(void *arg)
{
    auto f = (fence *)arg;
    if (--f->counter == 0)      // (1)
        // we have zeroed this fence's counter, wake up everyone that waits
        f->resume.notify_all(); // (2)
    else
    {
        unique_lock<mutex> lock(f->resume_mutex);
        while(f->counter) {
            f->resume.wait(lock);   // (3)
        }
    }
}
Zan Lynx
  • 53,022
  • 10
  • 79
  • 131
  • 3
    As far as I can tell, you only added a loop to catch the spurious wake-ups. This does not fix the problem I have originally described - a thread going for (3) may still yield, letting another thread reach (2) before the original one finally gets to (3). – IneQuation Jan 08 '14 at 09:43
  • Apart from that, by "lock, check and loop" do you mean a spinlock? – IneQuation Jan 08 '14 at 09:44