5

I have a class that implements a threaded producer/consumer system using a mutex and two condition variables for synchronization. The producer signals the consumer thread when there are items to use, and the consumer signals the producer thread when it has consumed the items. The threads continue producing and consuming until the destructor requests them to quit by setting a boolean variable. Because either of the threads may be waiting on a condition variable, I have to implement a second check of the quit variable, which feels wrong and messy...

I've reduced the problem down to the following (working on GNU/Linux with g++4.7) example:

// C++11and Boost required.
#include <cstdlib> // std::rand()
#include <cassert>

#include <boost/circular_buffer.hpp>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

// Creates a single producer and single consumer thread.
class prosumer
{
    public:
        // Create the circular buffer and start the producer and consumer thread.
        prosumer()
            : quit_{ false }
            , buffer_{ circular_buffer_capacity }
            , producer_{ &prosumer::producer_func, this }
            , consumer_{ &prosumer::consumer_func, this }
        {}

        // Set the quit flag and wait for the threads to exit.
        ~prosumer()
        {
            quit_ = true;
            producer_.join();
            consumer_.join();
        }

    private:
        // Thread entry point for the producer.
        void producer_func()
        {
            // Value to add to the ringbuffer to simulate data.
            int counter = 0;

            while ( quit_ == false )
            {
                // Simulate the production of some data.
                std::vector< int > produced_items;
                const auto items_to_produce = std::rand() % circular_buffer_capacity;
                for ( int i = 0; i < items_to_produce; ++i )
                {
                    produced_items.push_back( ++counter );
                }

                // Get a lock on the circular buffer.
                std::unique_lock< std::mutex > lock( buffer_lock_ );

                // Wait for the buffer to be emptied or the quit flag to be set.
                buffer_is_empty_.wait( lock, [this]()
                        {
                            return buffer_.empty() == true || quit_ != false;
                        } );

                // Check if the thread was requested to quit.
                if ( quit_ != false )
                {
                    // Don't let the consumer deadlock.
                    buffer_has_data_.notify_one();
                    break;
                }

                // The buffer is locked by this thread. Put the data into it.
                buffer_.insert( std::end( buffer_ ), std::begin( produced_items ), std::end( produced_items ) );

                // Notify the consumer that the buffer has some data in it.
                buffer_has_data_.notify_one();
            }
            std::cout << "producer thread quit\n";
        }


        // Thread entry for the consumer.
        void consumer_func()
        {
            int counter_check = 0;

            while ( quit_ == false )
            {
                std::unique_lock< std::mutex > lock( buffer_lock_ );

                // Wait for the buffer to have some data before trying to read from it.
                buffer_has_data_.wait( lock, [this]()
                        {
                            return buffer_.empty() == false || quit_ != false;
                        } );

                // Check if the thread was requested to quit.
                if ( quit_ != false )
                {
                    // Don't let the producer deadlock.
                    buffer_is_empty_.notify_one();
                    break;
                }

                // The buffer is locked by this thread. Simulate consuming the data.
                for ( auto i : buffer_ ) assert( i == ++counter_check );
                buffer_.clear();

                // Notify the producer thread that the buffer is empty.
                buffer_is_empty_.notify_one();
            }
            std::cout << "consumer thread quit\n";
        }

        // How many items the circular buffer can hold. 
        static const int circular_buffer_capacity = 64;

        // Flag set in the destructor to signal the threads to stop.
        std::atomic_bool quit_;

        // Circular buffer to hold items and a mutex for synchronization.
        std::mutex buffer_lock_;
        boost::circular_buffer< int > buffer_;

        // Condition variables for the threads to signal each other.
        std::condition_variable buffer_has_data_;
        std::condition_variable buffer_is_empty_;

        std::thread producer_;
        std::thread consumer_;
};


int main( int argc, char **argv )
{
    (void)argc; (void) argv;

    prosumer test;

    // Let the prosumer work for a little while.
    std::this_thread::sleep_for( std::chrono::seconds( 3 ) );

    return EXIT_SUCCESS;
}

If you look at the producer_func and consumer_func thread functions you can see that they loop until the quit variable is set by the prosumer destructor, but they also check for the quit variable again after they lock the circular buffer. If the quit variable was set, they signal each other to prevent a deadlock.

Another idea I had was to call notify_one() on the condition variables from the destructor, would that be a better solution?

Is there a better way to do this?

Update 1: I forgot to mention that in this instance, when the threads are requested to exit, the consumer does not need to consume any remaining data in the circular buffer and it's fine if the producer produces a little bit more too. As long as they both exit and don't deadlock all will be well.

x-x
  • 7,287
  • 9
  • 51
  • 78
  • Not related to your question but, are you not locking the buffer_ in producer until you fill the vector from the 2nd iteration ? – Jagannath Jan 04 '13 at 05:34
  • @Jagannath, Sorry, but can you explain further? I don't think I understand your comment. Doesn't condition_variable::wait() reaquire the lock when it unblocks so the circular buffer is protected when the contents of the vector are inserted into it? – x-x Jan 04 '13 at 05:52
  • 1st iteration in producer: You lock the buffer_ and then wait for the condition variable and then insert the data in buffer_ and then signal to consumer by calling notify_one() and then again insert the data in the vector. When is the lock removed on the buffer_ ? – Jagannath Jan 04 '13 at 06:01
  • May be I'm missing some concept here. – Jagannath Jan 04 '13 at 06:02
  • @Jagannath, The std::unique_lock< std::mutex > lock(buffer_lock_) variable goes out of scope immediately after the buffer_has_data_.notify_one() call and unlocks the mutex. Correct me if I'm wrong! – x-x Jan 04 '13 at 06:08
  • Sorry my mistake. Overlooked it. – Jagannath Jan 04 '13 at 06:15
  • I would check how others have implemented it, see [Producer-Consumer problem](http://en.wikipedia.org/wiki/Producer-consumer_problem). In any case, the `quit_` variable should be `volatile` in my opinion. – Ali Jan 04 '13 at 09:04
  • @Ali: It should not. `volatile` is neither necessary nor sufficient for multi-threaded programming. What it needs to be is atomic via `std::atomic<>`. Lucky for us, this particular use-case is further simplified with `std::atomic_flag`. You are correct at seeing the risk though: this code has a read-write conflict on that variable. – GManNickG Jan 04 '13 at 17:29
  • @GManNickG Thanks, I am still learning C++11. In general, what's wrong with `volatile bool`? Some book (I don't remember which) was advocating it for these purposes. I agree, now that we have standard solutions, we should stick to them. Do you mean that we have a nice, thread-safe `test_and_set` for `atomic_flag`, which we cannot do with `volatile bool` without a mutex? – Ali Jan 04 '13 at 17:56
  • 1
    @Ali: `volatile` simply doesn't guarantee atomicity nor visibility. `volatile` is a keyword that says "reads and writes to this variable are *observable behavior* so do all of them", which has the effect of disabling optimizations on that variable. What it doesn't do is say *how* those reads and writes should occur. It's simply a wrong thing that's gone around forever. – GManNickG Jan 04 '13 at 18:30
  • @GManNickG Please expand on "observable behavior" or please give some references where it is explained (for dummies). I am afraid I don't understand it. – Ali Jan 04 '13 at 18:35
  • 1
    @Ali: It's a technical term from the standard. If you [read this](http://stackoverflow.com/questions/6664471/observable-behaviour-and-compiler-freedom-to-eliminate-transform-pieces-c-co) you should be good. Basically, the result of a program is its observable behavior. The result the compiler can optimize `x = 2 + 2;` to `x = 4;` is because you cannot observe the effects of `2 + 2`, so by replacing it with `4` directly it's *as if* it performed the computation. – GManNickG Jan 04 '13 at 18:48
  • @GManNickG I should probably post a question with some code where you can explain what might go wrong. I should also dig up that book that was advocating `volatile bool`. I am sure you are right but I would like to fully understand you. Please give me some time. I will let you know about the question. Thanks! – Ali Jan 04 '13 at 18:54
  • @GManNickG OK, i see i should work myself through that post you linked to, it contains interesting info. Thanks! :) – Ali Jan 04 '13 at 18:56
  • let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/22201/discussion-between-gmannickg-and-ali) – GManNickG Jan 04 '13 at 18:57
  • @GManNickG, Why would the quit flag need to be std::atomic or std::atomic_flag? Isn't the difference between false and !false a single bit? Surely changing a single bit is atomic at the hardware level of all processors? – x-x Jan 04 '13 at 20:31
  • @DrTwox: No. Besides, processors generally don't operate on individual bits, so you need to get that guarantee for the native word size. In any case, why bother wondering and hoping when you can just use the utilities that exist? If it turns out no additional work is needed on your particular hardware, then they will optiimize as so. (Or put another way: if we had these kinds of guarantees, the standards committee wouldn't have spend years getting the memory model formnalized along with the entire threading and atomic library.) – GManNickG Jan 04 '13 at 20:53
  • @GManNickG, If a boolean variable is false, then all the bits are 0. If a boolean variable is !false, then one or more bits are 1. Is that a correct way of looking at bool in C++? "...processors generally don't operate on individual bits..." Exactly my point! If changing a single bit in a bool modifies if from false to !false, surely that must be atomic? What am I not understanding or missing here? – x-x Jan 04 '13 at 21:08
  • @DrTwox: The implementation of `bool` is completely unspecified. `sizeof(bool)` could be 1, 2, 4, or 128 if your compiler felt like it. The value representation of `false` could be all zeros, all ones, or some magic bit pattern, likewise with `true`. It's just totally unspecified. What you are missing is this: the C++ language provides no guarantees of atomicity or visibility without `std::atomic`. So if you don't have it, there's no point in wondering. Sure, you can continue to investigate it on a particular compiler, on a particular platform, etc., but that's not C++. It's C++ on platform X. – GManNickG Jan 04 '13 at 21:30
  • @GManNickG, Thank you for the clarification! I was lead to believe in C++ that bool was defined, in the standard, as false = 0, and true = !false, thus making the sizeof(bool) irrelevant to my point. It was that assumption that caused the misunderstanding. Thanks again for clearing that up! – x-x Jan 04 '13 at 21:43
  • @DrTwox: No problem. When a boolean that is `false` is *converted* to an integer, the result is indeed `0`, but whether or not the boolean is *stored* as an integer with the value 0 is unspecified. `true` is indeed `!false`. ' – GManNickG Jan 04 '13 at 22:03
  • If you don't mind using boost threads and condition variables instead, you could use `thread::interrupt()`. – Anonymous Coward Jan 05 '13 at 02:02

2 Answers2

3

In my opinion, calling notify_one (or rather notify_all if you were to extend your buffer to multiple producers/consumers) on both condition variables in the destructor before the calls to join would be the preferred solution for several reasons:

Firstly, this matches the way that conditional variables are typically used: By setting quit_, you change the state that the producer/consumer threads are interested in and wait for, so you should notify them of the state change.

Furthermore, notify_one should not be a very costly operation.

Also, in a more realistic application, it could be the case that in between the production of two elements, there is a delay; in that case you may not want to block in your destructor until the consumer notices it has to cancel as soon as the next element is enqueued; in the example code, that does not occur, as far as I can see.

  • 1
    I forgot to mention that you should probably lock your mutex before setting quit_ to true or use some other way (like atomics, but I am not sure whether that leaves a possible race condition) to prevent UB - I believe the compiler may, with your code, be free to optimize the quit_ checks away, though I am not too sure about that. – Phillip Keldenich Jan 04 '13 at 17:26
  • Thanks for the feedback. I updated the question to cover the last paragraph in your answer. In the actual application, it doesn't matter if the there are items remaining in the circular buffer when the destructor is called. – x-x Jan 05 '13 at 00:34
1

In my opinion, there are two functionalities that can be separated:

  1. message passing and dispatching
  2. producing and consuming

It does make sense to really separate them: the 'worker' thread does nothing more than process 'messages' that could mean 'quit' or 'do_work'.

This way you can create a generic 'worker' class that aggregates the actual function. The produce and consume methods stay clean, and the worker class care only about keeping the work going.

xtofl
  • 40,723
  • 12
  • 105
  • 192
  • Can you elaborate? Presumably at some point the produce and consume methods are still going to have to check some state to determine if the they should continue or exit? The prosumer class, as I see it, is already handling message passing and dispatching with the quit variable, which is the only message the threads need to know about. – x-x Jan 05 '13 at 00:26