3

I want to have a thread (std::thread) that does its work every 60 seconds, otherwise sleeps and instantly returns if the outside requests it. std::jthread is not an option, I'm limited to C++14.

std::condition_variable cv;

//Thread1:
void ThreadWork
{
    while(cv.wait_for(someLock, 60s) == std::cv_status_timeout)
    {
        Work();
    }
    return;
}

//Thread2:
void RequestEnd()
{
    cv.notify_one();
}

The idea here is that if the return val is std::cv_status_timeout, 60s have passed and we do the work normally. Otherwise, return.

Currently, I get some complaints from the runtime about the lock, or just straight up std::terminate.

Doing some tricks with a loop checking for an atomic variable is not what I want; this is about the thread sleeping.

Am I going in the right direction, at least?

1201ProgramAlarm
  • 32,384
  • 7
  • 42
  • 56
Thomas B.
  • 691
  • 4
  • 15
  • 2
    Can you include how you are handling the lock? It doesn't appear like you are managing the lock at all in this code. – mascoj Aug 31 '21 at 13:59
  • Traditionally, you would want to use some system specific features for such a use case. On Windows it would be `WaitForMultipleObjects()`, on linux it would be epoll, on FreeBSD kqueue. Usually you want to wait for work orders or a timeout, with a shutdown work order amongst others. If you want it portable, something like libEvent comes to mind. Last I checked (I do not check very often), C++ standard library does not offer a portable facility for that. – BitTickler Aug 31 '21 at 14:01
  • I have tried many ways, none of them work. So this is pretty much part of the question. – Thomas B. Aug 31 '21 at 14:03

5 Answers5

3

It is wrong to use condition_variable without another condition, due to spurious wakes. You need at least a bool variable.

The someLock should protect that other condition, and it should go locked into wait_for.

A lock acquired in a thread should be released in the same thread afterwards. (Usually, though not always, locks are stack variables).

Also dues to spurious wakes non-predicate wait_for is not convenient to use, as you need to recalculate timeout. Prefer predicate form, and check the condition in your predicate.

Alex Guteniev
  • 12,039
  • 2
  • 34
  • 79
  • What is the point of the `condition_variable` in the first place then? Especially if I intend to use the timeout and not the notify most of the time. Wouldn't this mean that I'd have to check if, yes, the thing actually elapsed? – Thomas B. Aug 31 '21 at 14:09
  • @ThomasB., The check is needed for negative case, when time has not elapsed, to see if termination notification really was meant. – Alex Guteniev Aug 31 '21 at 14:24
3

std::future<void> seems to do the trick.

// Worker thread setup
void Work(std::future<void> killSwitch)
{
    while(killSwitch.wait_for(60s) == std::future_status::timeout)
    {
        // ... Stuff
    }
}

// Controller thread
std::promise<void> killSwitch;
std::thread worker{ Work, std::move(killSwitch.get_future()) };

// ... Stuff

killSwitch.set_value(); // <-- this will break the loop
worker.join();

No mutexes, condition variables, spurious wakes, locks or atomics.

Thomas B.
  • 691
  • 4
  • 15
1

std::condition_variable is a low level primitive. Part of its design is that spurious wakeups happen; this means, sometimes people waiting on notifications will get notified even though nobody sent one.

I suspect the reason is a few fold

  1. The underlying mechanism on most OS's has such spurious wakeups. So the std library implementor would have to write the code to handle it if it hid it from the end user.

  2. In almost every use case of std::condition_variable, the code to handle threads moving faster/slower than expected ends up being connected to and overlap with efficient spurious wakeup code. So if the library handled it for you, you'd end up duplicating the work anyhow.

Your next problem is that the logic you have described is a bit vague. Time in a computer should not be treated as absolute; there is no "during the same 60s interval" in two different threads.

There is happens-before some synchronization and happens-after that synchronization.

I suspect you might want a latch. A latch is a synchronization primitive (but less primitive than condition variable). Think of a latch on a door or a gate. It starts off closed, and you can open it; but once you open it, there is no way to close it again.

Here, the latch being "open" means "worker thread, cease your endless toil".

struct latch {
  void open_latch() {
    auto l = lock();
    open = true;
    cv.notify_all();
  }
  void wait() const {
    auto l = lock();
    cv.wait(l, [&]{ return open; });
  }
  template<class Rep, class Period>
  bool wait_for(const std::chrono::duration<Rep, Period>& duration) const {
    auto l = lock();
    return cv.wait_for(l, duration, [&]{ return open; });
  }
  template<class Clock, class Period>
  bool wait_until(const std::chrono::time_point<Clock, Period>& when) const {
    auto l = lock();
    return cv.wait_until(l, when, [&]{ return open; });
  }
private:
  auto lock() const { return std::unique_lock<std::mutex>(m); }
  mutable std::mutex m;
  bool open = false;
  std::condition_variable cv;
};

now your code looks like:

latch l;

Thread1:
void ThreadWork
{
    while(!l.wait_for(60s))
    {
        Work();
    }
    return;
}

Thread2:
void RequestEnd()
{
    l.open_latch();
}

(Code not tested, but this isn't my first rodeo).

There are a bunch of things this pattern handles, including the latch being opened before anyone waits on it.

I'd advise using wait_until instead of wait_for if you want X instances of work to occur after X minutes (note if the work takes more than 1 minute, the waiting will be reduce to near zero time). If you instead want a 1 minute "break" between doing work, use wait_for.

Almost all uses of std::condition_variable has this 3 part system; the mutex, the payload and the condition variable. The mutex should pretty much always guard the payload (even if atomic!) and only the payload. Sometimes the payload is two-part, like an abort flag and a more complex data structure.

Yakk - Adam Nevraumont
  • 262,606
  • 27
  • 330
  • 524
  • Some recommend unlocking the lock after the condition changed (just before `notify_all`) so that notified threads aren't locked again immediately after waking. I don't know if it actually matters (I asked https://stackoverflow.com/q/63736832/2945027 ) – Alex Guteniev Aug 31 '21 at 15:49
  • 2
    @AlexGuteniev Someone who I recall as a compiler library implementor once told me to hold onto the mutex while notifying to avoid a double-tap wakeup; the notification knows which listeners are going to be waiting on the mutex, and avoids waking them early before the mutex is free. I don't know if that is right, but it seemed plausible. – Yakk - Adam Nevraumont Aug 31 '21 at 15:51
0

With threading you need to be very precise Your idea of using a condition variable is correct If you try to stop the thread before it started, the condition variable notify is missed and your thread will never stop. Then there are other issues with the condition variable, see my class state_variable. This example is quite detailed but it addresses all those issues and avoids race conditions.

#include <chrono>
#include <iostream>
#include <mutex>
#include <future>
#include <condition_variable>

//-----------------------------------------------------------------------------------------------------
// state of the thread that does the waiting
// used for stable starting and stopping

enum class thread_state
{
    idle,
    starting,
    running,
    stopping,
    stopped
};

//-----------------------------------------------------------------------------------------------------
// helper class for use of std::condition_variable, makes code more readable
// and takes into account the pitfalls of condition variables : 
// https://www.modernescpp.com/index.php/c-core-guidelines-be-aware-of-the-traps-of-condition-variables


template<typename T>
class state_variable
{
public:
    state_variable() = delete;
    state_variable(const state_variable&) = delete;
    state_variable(state_variable&&) = delete;
    state_variable& operator=(const state_variable&) = delete;

    explicit state_variable(const T& value) :
        m_value{ value }
    {
    }

    void operator=(const T& value) noexcept
    {
        {
            std::unique_lock<std::mutex> lock(m_value_mutex);
            m_value = value;
        }
        m_value_changed.notify_all();
    }

    // atomic check and set
    T set_if(const T& from_value, const T& to_value) noexcept
    {
        {
            std::unique_lock<std::mutex> lock(m_value_mutex);
            if (m_value != from_value) return from_value;
            m_value = to_value;
        }
        m_value_changed.notify_all();
        return to_value;
    }

    const bool try_wait_for(const T& value, const std::chrono::steady_clock::duration& duration) const noexcept
    {
        auto pred = [this, value] { return (m_value == value); };
        std::unique_lock<std::mutex> lock(m_value_mutex);
        if (pred()) return true;
        return m_value_changed.wait_for(lock, duration, pred);
    }

    void wait_for(const T& value) const
    {
        try_wait_for(value, std::chrono::steady_clock::duration::max());
    }

private:
    // mutables so I could make the const promises on wait 
    // that they wont change the observable state (m_value)
    // of this class.
    mutable std::mutex m_value_mutex;
    mutable std::condition_variable m_value_changed;
    std::atomic<T> m_value;
};

//-----------------------------------------------------------------------------------------------------

class Worker final
{
public:
    template<typename lambda_t>
    Worker(lambda_t lambda, const std::chrono::steady_clock::duration& loop_time) :
        m_state{ thread_state::idle },
        m_looptime{ loop_time },
        m_work{ lambda }
    {
    };

    Worker(const Worker&) = delete;
    Worker(Worker&&) = delete;
    Worker& operator=(const Worker&) = delete;

    void Start()
    {
        if (m_state.set_if(thread_state::idle, thread_state::starting) != thread_state::starting)
        {
            throw std::runtime_error("only an idle Worker can be started");
        }

        // 
        // Note that std::async, and std::thread don't guarantee the thread is active
        // when the call returns!
        // 
        // it is okay to capture "this" because the destructor of the 
        // Worker synchronizes with termination of this thread through the future
        // So the thread will have a shorter life cycle then the worker!
        //
        m_future = std::async(std::launch::async, [this]
        {
            // Set indication that the thread has really started.
            m_state = thread_state::running;

            do
            {
                m_work();
        
                // using a statevariable to check for stopping means it can respond 
                // during the one second delay and stop immediately. 
                // this is an advantage over using sleep
            } while (!m_state.try_wait_for(thread_state::stopping, m_looptime));

            m_state = thread_state::stopped;
        });

        // Wait for the thread to have really started
        // this way we have a clear post condition for start
        m_state.wait_for(thread_state::running);
    }

    void Stop()
    {
        // only allow a running Worker to be stopped.
        // in all other states Stop does nothing
        if (m_state.set_if(thread_state::running, thread_state::stopping) == thread_state::stopping)
        {
            // synchronization with stopped state, as set by other thread
            m_state.wait_for(thread_state::stopped);

            // future get is not really needed for synchronization.
            // but if thread threw an exception it's rethrown here 
            m_future.get();
        }
    }

    ~Worker()
    {
        // Automatically stop thread if this hasn't already happened.
        Stop();
    }

private:
    std::future<void> m_future;
    state_variable<thread_state> m_state;
    std::chrono::steady_clock::duration m_looptime;
    std::function<void()> m_work;
};


int main()
{
    auto work = []
    {
        std::cout << ".";
    };

    Worker worker(work, std::chrono::seconds(1)); // make 60 for your case and replace work with lambda or std::function<void()>

    std::cout << "Press enter to stop..." << std::endl;
    std::cout << "Every second Work() will be called, and a '.' be printed" << std::endl;
    std::string input;

    worker.Start();
    std::getline(std::cin,input);

    return 0;

    // destructor of worker takes care thread is stopped nicely before exiting the program!
}
Pepijn Kramer
  • 9,356
  • 2
  • 8
  • 19
0

My recommendation is, NOT to use condition variables if it can be avoided. They have many pitfalls, and in the end you do not have the general solution for "notifying a thread".

Let me describe the general case, first, so you see, where this is going:

In general, you want to

  • Do something after a fixed amount of time (or a scheduled point in time)
  • Receive application defined "messages" or "events", perhaps put into a queue by other threads
  • Wait for resources, such as socket handles, file handles, ...

A condition variable approach to your subset problem does not scale to when your requirements change and you need more features of the general case.

So, with the risk of getting it wrong with condition variables and the fact, that this approach lacks flexibility and does not account for future changes, I advice to go straight to the general solution.

As I posted in my comment below the question, you might have to use system specific apis or use a library, which does that for you and gives you a portable API. (I do not regularly check, if C++ in its latest mutation has added such a facility.)

BitTickler
  • 10,905
  • 5
  • 32
  • 53
  • C++20 has latch, barrier, semaphore and atomic wait, all of these are usable here. It is disputable whether `condition_variables` should not be recommended. Yes, they are restricted, but this restriction is for good; once you get the right pattern to use them, you'll avoid mistakes that you would do with, say Windows Event in some more complex scenario. – Alex Guteniev Aug 31 '21 at 14:39
  • Its funny, how people who started out on posix systems keep complaining about windows Event and call it flawed, without ever giving a reason for their theory (I used Windows events for decades without any surprises). When I had to use posix condition variables for the first time, I kept banging my head on the table and wrote myself a C++ class `CEvent`, trying to get back what I was used to and fits to my way of concurrent programming. – BitTickler Aug 31 '21 at 14:42
  • I started on Windows. It took me some time to learn to use `condition_variable`. Sure Windows Event is not flawed (except maybe `PulseEvent`), but it is very low level and needs more careful use (especially manual-reset one). – Alex Guteniev Aug 31 '21 at 14:45
  • Manual reset event is cool if you want to have notifiable queues (e.g. for your shutdown and other events to the thread). `push()` into the queue sets the event and `pop()` resets it if the queue is empty (and yes, you need some interlocked (atomic) size counter and not just some variable to determine if the queue is empty. Else you get a race condition. But the *huge* diff between windows and posix is, that you can wait for resources and events with one `WaitForMultipleObjects()` call, but posix thread objects and sockets never got a unified wait... Hence my advice for a general solution. – BitTickler Aug 31 '21 at 14:49
  • @BitTickler: for a queue on Windows, I've found a counted semaphore works really nicely. For example: https://stackoverflow.com/a/55854116/179910 – Jerry Coffin Aug 31 '21 at 16:18