90

A project I'm working on uses multiple threads to do work on a collection of files. Each thread can add files to the list of files to be processed, so I put together (what I thought was) a thread-safe queue. Relevant portions follow:

// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
//                   notify waiting threads of a new item in the queue

void FileQueue::enqueue(std::string&& filename)
{
    std::lock_guard<std::mutex> lock(qMutex);
    q.push(std::move(filename));

    // Notify anyone waiting for additional files that more have arrived
    populatedNotifier.notify_one();
}

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    if (q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
            std::string ret = q.front();
            q.pop();
            return ret;
        }
        else {
            return std::string();
        }
    }
    else {
        std::string ret = q.front();
        q.pop();
        return ret;
    }
}

However, I am occasionally segfaulting inside the if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { } block, and inspection in gdb indicates that the segfaults are occurring because the queue is empty. How is this possible? It was my understanding that wait_for only returns cv_status::no_timeout when it has been notified, and this should only happen after FileQueue::enqueue has just pushed a new item to the queue.

Matt Kline
  • 10,149
  • 7
  • 50
  • 87
  • 4
    Here you go: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html – GManNickG Mar 07 '13 at 18:04
  • 1
    Question, why are you taking `filename` by ref-ref? I can't see any reason for that here> – Tony The Lion Mar 07 '13 at 18:20
  • 1
    @TonyTheLion Generally in C++ it is more efficient to pass objects by reference than to make a copy. In this case I'm also using move semantics, which lets the compiler move the contents of the string into the queue instead of making another copy. – Matt Kline Mar 07 '13 at 20:24
  • @slavik262: Your use of `std::forward` here isn't normal (that used in "universal references"), you should just `std::move` it. – GManNickG Mar 07 '13 at 21:22
  • @GManNickG Sorry, could you elaborate? – Matt Kline Mar 08 '13 at 00:31
  • @slavik262: You should just use `std::move(filename)` here, `std::forward` is intended to be used in a perfect forwarding context. – GManNickG Mar 08 '13 at 01:08
  • 2
    Actually the preferred way to take advantage of move semantics here is to use `std::move` and take the `filename` parameter of `enqueue` by value instead of by non-const rvalue reference. As it is, it can only be called with rvalues which is probably not what you intended. – Ben Hymers May 24 '13 at 09:51
  • Furthering what @BenHymers said, the *caller* of this would have to use `std::move`; using it here is pointless, as you already have an rvalue reference, and all `std::move` does is cast to one. Declaring a by-value argument here, then using `std::move` to push into the queue is ideal, as the caller can then either pass by value (which will make a copy before invoke, then move that copy into the queue via `std::move`), or by rvalue reference (caller uses `std::move`, in which case the move-ctor is called to construct the arg, then `std::move`ed *again* to push to the queue. – WhozCraig Mar 07 '19 at 15:23

9 Answers9

85

It is best to make the condition (monitored by your condition variable) the inverse condition of a while-loop: while(!some_condition). Inside this loop, you go to sleep if your condition fails, triggering the body of the loop.

This way, if your thread is awoken--possibly spuriously--your loop will still check the condition before proceeding. Think of the condition as the state of interest, and think of the condition variable as more of a signal from the system that this state might be ready. The loop will do the heavy lifting of actually confirming that it's true, and going to sleep if it's not.

I just wrote a template for an async queue, hope this helps. Here, q.empty() is the inverse condition of what we want: for the queue to have something in it. So it serves as the check for the while loop.

#ifndef SAFE_QUEUE
#define SAFE_QUEUE

#include <queue>
#include <mutex>
#include <condition_variable>

// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
  SafeQueue(void)
    : q()
    , m()
    , c()
  {}

  ~SafeQueue(void)
  {}

  // Add an element to the queue.
  void enqueue(T t)
  {
    std::lock_guard<std::mutex> lock(m);
    q.push(t);
    c.notify_one();
  }

  // Get the "front"-element.
  // If the queue is empty, wait till a element is avaiable.
  T dequeue(void)
  {
    std::unique_lock<std::mutex> lock(m);
    while(q.empty())
    {
      // release lock as long as the wait and reaquire it afterwards.
      c.wait(lock);
    }
    T val = q.front();
    q.pop();
    return val;
  }

private:
  std::queue<T> q;
  mutable std::mutex m;
  std::condition_variable c;
};
#endif
Daniel Russell
  • 578
  • 4
  • 9
ChewOnThis_Trident
  • 2,105
  • 2
  • 18
  • 22
  • 3
    Thanks! Thankfully I've already solved the issue using predicates as described [here](http://en.cppreference.com/w/cpp/thread/condition_variable/wait_for). – Matt Kline Apr 22 '13 at 02:38
  • 2
    Simplest and most elegant solution of the lot, IMHO. – kuroi neko Sep 13 '14 at 00:48
  • Great solution! Thanks for sharing. One question though. How do you deal with the situation where the queue is full? Do we need to consider it while designing a queue? – Sarah Jun 28 '15 at 22:58
  • Sorry for the late response, I wouldn't worry about it, but I haven't been working in C++ in a long time and the systems I am on are not really memory constrained. – ChewOnThis_Trident Jul 28 '15 at 20:27
  • 17
    FYI: The `while(q.empty())` loop is equivalent to: `c.wait( lock, [&]{ return !q.empty(); } );` – Ahmed Nassar Feb 22 '18 at 01:33
  • 1
    When `dequeue` is called, and the queue is empty, it will unnecessarily take the lock and would hold it. This would let enqueuer to block. You've got a deadlock. – Ajay Dec 13 '19 at 10:56
  • 4
    @Ajay for the benefit of those coming here (like me) in the future reading you comment, no, that's not the case, because [`condition_variable::wait` releases the lock and only reacquires when awaken](https://en.cppreference.com/w/cpp/thread/condition_variable/wait) – GPhilo Nov 06 '20 at 12:23
  • @GPhilo, Yes, you are right. I was misinformed about CV when I wrote that comment. – Ajay Nov 06 '20 at 12:32
  • 1
    Would it be better to use `void enqueue(T&& t)` and `q.push(std::forward(t));` so we can move into the queue? – GPhilo Apr 15 '21 at 09:47
  • 1
    @AhmedNassar `while()` is faster than lambda function. Profile it and see by yourself. – Victor Volpe Mar 09 '22 at 17:52
  • Also the `c.notify_one();` could be outside the lock, so that the waiting side thread does not have to do additional `wait`s to acquire the lock in case it wakes up immediately. – Vassilis Apr 30 '22 at 08:44
37

According to the standard condition_variables are allowed to wakeup spuriously, even if the event hasn't occured. In case of a spurious wakeup it will return cv_status::no_timeout (since it woke up instead of timing out), even though it hasn't been notified. The correct solution for this is of course to check if the wakeup was actually legit before proceding.

The details are specified in the standard §30.5.1 [thread.condition.condvar]:

—The function will unblock when signaled by a call to notify_one(), a call to notify_all(), expiration of the absolute timeout (30.2.4) specified by abs_time, or spuriously.

...

Returns: cv_status::timeout if the absolute timeout (30.2.4) specifiedby abs_time expired, other-ise cv_status::no_timeout.

Community
  • 1
  • 1
Grizzly
  • 19,595
  • 4
  • 60
  • 78
  • 1
    And how would you suggest doing that? Just by checking if the queue is empty again? – Matt Kline Mar 07 '13 at 18:03
  • 4
    Addendum: I could likely protect against spurious wakeups using a predicate such as the one [here](http://en.cppreference.com/w/cpp/thread/condition_variable/wait_for). – Matt Kline Mar 07 '13 at 18:05
  • 2
    Yes. It's called a _condition_ variable because it is associated with some condition, which you must check is actually true. In your case the condition to check is `!q.empty()` – Jonathan Wakely Mar 07 '13 at 19:55
  • 5
    If you use a lambda as the optional argument to the wait() call, it will do the check for you and prevent the spurious wakeup from having any effect. – derpface Jan 19 '15 at 14:19
  • 7
    As a historical note, this is most likely done in order to support the behavior of low-level Unix system calls. If you find that somewhat unsatisfactory from a client design perspective, so did Richard Gabriel way back in 1989. His musings on the subject became a very famous software design essay, [The Rise of "Worse-is-Better"](https://www.jwz.org/doc/worse-is-better.html) – T.E.D. Nov 07 '16 at 19:28
20

This is probably how you should do it:

void push(std::string&& filename)
{
    {
        std::lock_guard<std::mutex> lock(qMutex);

        q.push(std::move(filename));
    }

    populatedNotifier.notify_one();
}

bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);

    if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
        return false;

    filename = std::move(q.front());
    q.pop();

    return true;    
}
ronag
  • 49,529
  • 25
  • 126
  • 221
  • Thanks! I ended up doing something similar. – Matt Kline Mar 07 '13 at 18:13
  • Two comments on that otherwise fine code: 1) before notify_one, I would unlock the mutex for reasons found on http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one 2) the wait process can be awoken spuriously, so I would additionally introduce a bool variable indicating that push is indeed over – IceFire Oct 30 '14 at 10:11
  • 1
    1) Minor optimization. 2) This overload of `wair_for` handles spurious wakes through the second argument. – ronag Oct 30 '14 at 11:35
  • Sorry, you are right. Actually, 1) is accomlished via lock_guard already. My error in reasoning for 2) was that I thought a push operation could be ongoing if the queue was not empty... however, in this case the still-blocked mutex would keep wait_for waiting anyways. Thanks! – IceFire Oct 30 '14 at 13:30
  • I have a question about this. Why are the lock wrappers used for push and pop different? Why not use std::unique_lock on both? – user7024 Jan 12 '23 at 08:37
13

Adding to the accepted answer, I would say that implementing a correct multi producers / multi consumers queue is difficult (easier since C++11, though)

I would suggest you to try the (very good) lock free boost library, the "queue" structure will do what you want, with wait-free/lock-free guarantees and without the need for a C++11 compiler.

I am adding this answer now because the lock-free library is quite new to boost (since 1.53 I believe)

quantdev
  • 23,517
  • 5
  • 55
  • 88
  • 2
    Thanks for pointing out that library. There currently doesn't seem to be documentation for the queue, however. Any idea on where that can be found? – Matt Kline May 21 '14 at 13:31
5

I would rewrite your dequeue function as:

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    while(q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout ) 
           return std::string();
    }
    std::string ret = q.front();
    q.pop();
    return ret;
}

It is shorter and does not have duplicate code like your did. Only issue it may wait longer that timeout. To prevent that you would need to remember start time before loop, check for timeout and adjust wait time accordingly. Or specify absolute time on wait condition.

Slava
  • 43,454
  • 1
  • 47
  • 90
1

There is also GLib solution for this case, I did not try it yet, but I believe it is a good solution. https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues.html#g-async-queue-new

ransh
  • 1,589
  • 4
  • 30
  • 56
1

BlockingCollection is a C++11 thread safe collection class that provides support for queue, stack and priority containers. It handles the "empty" queue scenario you described. As well as a "full" queue.

gm127
  • 49
  • 4
1

This is my implementation of a thread-queue in C++20:

#pragma once
#include <deque>
#include <mutex>
#include <condition_variable>
#include <utility>
#include <concepts>
#include <list>

template<typename QueueType>
concept thread_queue_concept =
    std::same_as<QueueType, std::deque<typename QueueType::value_type, typename QueueType::allocator_type>>
    || std::same_as<QueueType, std::list<typename QueueType::value_type, typename QueueType::allocator_type>>;

template<typename QueueType>
    requires thread_queue_concept<QueueType>
struct thread_queue
{
    using value_type = typename QueueType::value_type;
    thread_queue();
    explicit thread_queue( typename QueueType::allocator_type const &alloc );
    thread_queue( thread_queue &&other );
    thread_queue &operator =( thread_queue const &other );
    thread_queue &operator =( thread_queue &&other );
    bool empty() const;
    std::size_t size() const;
    void shrink_to_fit();
    void clear();
    template<typename ... Args>
        requires std::is_constructible_v<typename QueueType::value_type, Args ...>
    void enque( Args &&... args );
    template<typename Producer>
        requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, typename QueueType::value_type>>; }
    void enqueue_multiple( Producer producer );
    template<typename Consumer>
        requires requires( Consumer consumer, typename QueueType::value_type value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; }
    void dequeue_multiple( Consumer consumer );
    typename QueueType::value_type dequeue();
    void swap( thread_queue &other );
private:
    mutable std::mutex m_mtx;
    mutable std::condition_variable m_cv;
    QueueType m_queue;
};

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType>::thread_queue()
{
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType>::thread_queue( typename QueueType::allocator_type const &alloc ) :
    m_queue( alloc )
{
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType>::thread_queue( thread_queue &&other )
{
    using namespace std;
    lock_guard lock( other.m_mtx );
    m_queue = move( other.m_queue );
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType> &thread_queue<QueueType>::thread_queue::operator =( thread_queue const &other )
{
    std::lock_guard
        ourLock( m_mtx ),
        otherLock( other.m_mtx );
    m_queue = other.m_queue;
    return *this;
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
thread_queue<QueueType> &thread_queue<QueueType>::thread_queue::operator =( thread_queue &&other )
{
    using namespace std;
    lock_guard
        ourLock( m_mtx ),
        otherLock( other.m_mtx );
    m_queue = move( other.m_queue );
    return *this;
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
bool thread_queue<QueueType>::thread_queue::empty() const
{
    std::lock_guard lock( m_mtx );
    return m_queue.empty();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
std::size_t thread_queue<QueueType>::thread_queue::size() const
{
    std::lock_guard lock( m_mtx );
    return m_queue.size();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
void thread_queue<QueueType>::thread_queue::shrink_to_fit()
{
    std::lock_guard lock( m_mtx );
    return m_queue.shrink_to_fit();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
void thread_queue<QueueType>::thread_queue::clear()
{
    std::lock_guard lock( m_mtx );
    m_queue.clear();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
template<typename ... Args>
    requires std::is_constructible_v<typename QueueType::value_type, Args ...>
void thread_queue<QueueType>::thread_queue::enque( Args &&... args )
{
    using namespace std;
    unique_lock lock( m_mtx );
    m_queue.emplace_front( forward<Args>( args ) ... );
    m_cv.notify_one();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
typename QueueType::value_type thread_queue<QueueType>::thread_queue::dequeue()
{
    using namespace std;
    unique_lock lock( m_mtx );
    while( m_queue.empty() )
        m_cv.wait( lock );
    value_type value = move( m_queue.back() );
    m_queue.pop_back();
    return value;
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
template<typename Producer>
    requires requires( Producer producer ) { { producer() } -> std::same_as<std::pair<bool, typename QueueType::value_type>>; }
void thread_queue<QueueType>::enqueue_multiple( Producer producer )
{
    using namespace std;
    lock_guard lock( m_mtx );
    for( std::pair<bool, value_type> ret; (ret = move( producer() )).first; )
        m_queue.emplace_front( move( ret.second ) ),
        m_cv.notify_one();
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
template<typename Consumer>
    requires requires( Consumer consumer, typename QueueType::value_type value ) { { consumer( std::move( value ) ) } -> std::same_as<bool>; }
void thread_queue<QueueType>::dequeue_multiple( Consumer consumer )
{
    using namespace std;
    unique_lock lock( m_mtx );
    for( ; ; )
    {
        while( m_queue.empty() )
            m_cv.wait( lock );
        try
        {
            bool cont = consumer( move( m_queue.back() ) );
            m_queue.pop_back();
            if( !cont )
                return;
        }
        catch( ... )
        {
            m_queue.pop_back();
            throw;
        }
    }
}

template<typename QueueType>
    requires thread_queue_concept<QueueType>
void thread_queue<QueueType>::thread_queue::swap( thread_queue &other )
{
    std::lock_guard
        ourLock( m_mtx ),
        otherLock( other.m_mtx );
    m_queue.swap( other.m_queue );
}

The only template-parameter is BaseType, which can be a std::deque type or std::list type, restricted with thread_queue_concept. This class uses this type as the internal queue type. Chose that BaseType that is most efficient for your application. I might have restricted the class on a more differentiated thread_queue_concepts that checks for all the used parts of BaseType so that this class might apply for other types compatible to std::list<> or std::deque<> but I was too lazy to implement that for the unlikely case that someone implements something like that on his own. One advantage of this code are enqueue_multiple and dequeue_multiple. These functions are given a function-object, usually a lambda, which can enqueue or dequeue multiple items with only one locking step. For enqueue this always holds true, for dequeue this depends on if the queue has elements to fetch or not.
enqueue_multiple usually makes sense if you have one producer and multiple consumers. It results in longer periods holding the lock and therefore it makes sense only if the items can be produced or move fast.
dequeue_multiple usually makes sense if you have multiple producers and one consumer. Here we also have longer locking periods, but as objects are usually only have fast moves here, this normally doesn't hurt.
If the consumer function object of the dequeue_multiple throws an exception while consuming, the exception is caugt and the element provided to the consumer (rvalue-refernce inside the underlying queue types object) is removed.
If you like to use this class with C++11 you have to remove the concepts or disable them with #if defined(__cpp_concepts).

Bonita Montero
  • 2,817
  • 9
  • 22
0

You may like lfqueue, https://github.com/Taymindis/lfqueue. It’s lock free concurrent queue. I’m currently using it to consuming the queue from multiple incoming calls and works like a charm.

woon minika
  • 1
  • 1
  • 1