1

I have a WorkDispatcher class which holds Worker class objects as properties and launches their function in new threads. Here is a example:

WorkDispatcher.h:

class WorkDispatcher
{

private:
    std::thread m_reconstructionThread;
    std::shared_ptr <Reconstruction::RGBDImageModel> m_currentRGBD;

public:
    WorkDispatcher();

    std::mutex m_rgbdMutex, m_commandMutex;
    std::deque<Network::NetworkCommandModel> m_CommandQueue;
    std::condition_variable m_RgbConditional, m_CommandConditional;

    Reconstruction::SceneReconstructor m_Reconstructor;

    void Initialize();
    void Work();
    void Stop();

};

WorkDispatcher.cpp:

void WorkDispatcher::Work()
{    
    m_reconstructionThread = std::thread(
        &Reconstruction::SceneReconstructor::Reconstruct, 
        std::ref(m_Reconstructor),
        std::ref(m_currentRGBD),
        std::ref(m_CommandQueue),
        std::ref(m_rgbdMutex), 
        std::ref(m_RgbConditional), 
        std::ref(m_commandMutex), 
        std::ref(m_CommandConditional)
    );
}

These functions hold infinite loops and I use the condition variables to wait until work is avaible. For example my Reconstruct function:

void SceneReconstructor::Reconstruct(
    std::shared_ptr<RGBDImageModel>& currentImage,
    std::deque<Network::NetworkCommandModel> commandQueue,
    std::mutex& rgbdMutex,
    std::condition_variable& rgbdCond,
    std::mutex& commandMutex,
    std::condition_variable& commandConditional)
{
    while (true)
    {
        std::unique_lock<std::mutex> rgbdLocker(rgbdMutex);
        rgbdCond.wait(rgbdLocker, [this] {return m_QuitReconstructionFlag; });
        
        // Quit flag to break out of loop
        if (m_QuitReconstructionFlag)
            break;

        // do stuff here

    }
}

So far so good, however if I want to quit the application I need to quit all of my worker threads. As seen above, for this these classes have a flag to quit, which I uses as follows:

void WorkDispatcher::Stop()
{
    // for all worker threads do this
    m_Reconstructor.m_QuitReconstructionFlag = true;
    if (m_reconstructionThread.joinable())
        m_reconstructionThread.join();
}

In theory this should stop the wait() function within a worker threads loop and then break out of the loop with the m_QuitReconstructionFlag, however this doesn't work.

What does work is the following:

  1. remove the lambda from the wait functions
  2. call notify_all() on the condition variables after settings the quit-flags to true;

This works fine for me, however the question is, why doesn't the lambda work?

Roland Deschain
  • 2,211
  • 19
  • 50

3 Answers3

1

why doesn't the lambda work?

It works just fine, by itself.

However, C++ requires complex rules to be followed to properly synchronize multiple execution threads. Just because one execution thread sets a particular variable to a value does not guarantee you, in any way, that other execution threads will see the variable's new value. The synchronization rules govern that behavior.

So, this lambda works just fine. In its own execution thread. But if you want this lambda to observe changes to the value, made by other execution threads, this must be correctly synchronized.

Additionally, if you review your documentation of wait(), you should find a detailed explanation that says that if the condition function evaluates to false, it will not be called again until the condition variable is notified.

What does work is ... call notify_all()

Well, of course. Since wait() requires the condition variable to be notified, before it checks the waited-on condition again, then that's what you must do!

Finally, notifying the condition variable will work correctly in most cases, but, as I mentioned, synchronization rules (of which mutexes and condition variables play an important part of) have some edge cases where this, by itself, will not work. You must follow the following sequence of events strictly in order to have proper synchronization in all edge cases:

  1. Lock the same mutex that another execution thread has locked before waiting on its condition variable.
  2. Notify the condition variable.
  3. Unlock the mutex
Sam Varshavchik
  • 114,536
  • 5
  • 94
  • 148
  • Thanks for the detailed answer, this clears things up quite a bit! :) I was watching several tutorials on the subject, none of them mentioning these details – Roland Deschain Sep 24 '21 at 12:59
  • You've just discovered, by yourself, why I keep repeating the following on Stackoverflow: C++ is the most complicated general purpose programming language in use today. Any clown can publish a web site, or upload a rambling video to Youtube. But the only practical way to learn C++ is [with a good textbook](https://stackoverflow.com/questions/388242/). It costs money and dead trees to publish one. A publisher won't risk it without fully vetting the author, the material, and editing the book thoroughly. – Sam Varshavchik Sep 24 '21 at 14:01
  • You need step 1.5; **modify** the state that the condition variable checks in the wait-lambda or equivalent. (after you do that modification, unlocking the mutex before notifying will technically work; in fact, if the state is atomic, you can get away with locking the mutex before you notify, but it must be locked for some interval between modifying the state and notifying or there is a (non-memory-model) race condition. If the OP makes the bool atomic, the (memory model) race condition goes away, but *their code still doesn't work* always, because an algorithmic race condition remains. – Yakk - Adam Nevraumont Sep 24 '21 at 15:09
1

You must protect m_QuitReconstructionFlag with the same mutex used by the condition variable wait.

Or it won't work.

When using a condition variable if you do not want to learn about the C++ memory model in extreme detail, you should follow "best practices" that defend you against problems.

The best practices for a condition variable is to bundle up 3 things together.

  1. The condition variable.
  2. A mutex (often mutable).
  3. A state.

Then bundle all 3 of them up behind a single abstraction of some kind.

To change the state:

  1. Lock the mutex
  2. Change the state
  3. Notify the condition variable appropriately
  4. Unlock the mutex

Do not think that the state being atomic means you don't have to lock the mutex.

When you want to wait on the condition variable:

  1. Lock the mutex
  2. Wait, passing a lambda that checks the state.
  3. When exiting wait, you are free to update the state.
  4. Unlock the mutex

In general, use a unique_lock to lock the mutex in all of the above cases, and rely on RAII to unlock it.

What, exactly, the state is, and when you notify, is up to you.

Do not interact with that mutex directly outside of this bundle and api, don't interact with the state directly outside of this bundle and api, and don't interact with the condition variable outside of this bundle and api.

Copy or move data out of the api if needed, don't hold pointers or references or iterators into it.

Your state can have more than just one variable in it. Like, you can have a queue and a bool for shutdown.

For example, suppose you have a queue.

template<class T>
struct cv_queue {
  std::optional<T> pop() {
    auto l = lock();
    cv.wait( l, [&]{ return aborted || !queue.empty(); } );
    if (aborted) return {};
    auto retval = std::move(queue.front());
    queue.pop_front();
    return retval;
  }
  void push( T in ) {
    auto l = lock();
    queue.push_back( std::move(in) );
    cv.notify_one();
  }
  void abort_everything() {
    auto l = lock();
    abort = true;
    cv.notify_all();
  }
  bool empty() const {
    auto l = lock();
    return queue.empty();
  }
private:
  std::condition_variable cv;
  mutable std::mutex m;
  std::deque<T> queue;
  bool aborted=false;

  auto lock() const { return std::unique_lock( m ); }
};

adding pop_wait_for or try_pop isn't hard.

A simple 3-part wrapper around data or whatever isn't hard to write. Making it more generic, in my experience, doesn't add much to it being understandable.

Yakk - Adam Nevraumont
  • 262,606
  • 27
  • 330
  • 524
0

Here the lambda returning true is not the condition to stop waiting, rather the lambda is to account for spurious wake ups. The notify or notify_all function of the conditional_variable is what is used to make the wait quit.

Rather than removing the lambda, you must simply change the stop function to

void WorkDispatcher::Stop()
{
    // for all worker threads do this
    m_Reconstructor.m_QuitReconstructionFlag = true;
    m_RgbConditional.notify_all() 
    if (m_reconstructionThread.joinable())
        m_reconstructionThread.join();
}

from here you can see that the wait with predicate passed to it (wait(predicate)) is equivalent to

if(!predicate())
    wait()

Hence when you call Stop(), It sets the predicate to true, so when the thread is woken up wait() returns, and the predicate is checked, if it is true, the wait(predicate) returns.

In the earlier case, the predicate was set to true but the function was not woken up

WARhead
  • 643
  • 5
  • 17