I have a class that implements a threaded producer/consumer system using a mutex and two condition variables for synchronization. The producer signals the consumer thread when there are items to use, and the consumer signals the producer thread when it has consumed the items. The threads continue producing and consuming until the destructor requests them to quit by setting a boolean variable. Because either of the threads may be waiting on a condition variable, I have to implement a second check of the quit variable, which feels wrong and messy...
I've reduced the problem down to the following (working on GNU/Linux with g++4.7) example:
// C++11and Boost required.
#include <cstdlib> // std::rand()
#include <cassert>
#include <boost/circular_buffer.hpp>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
// Creates a single producer and single consumer thread.
class prosumer
{
public:
// Create the circular buffer and start the producer and consumer thread.
prosumer()
: quit_{ false }
, buffer_{ circular_buffer_capacity }
, producer_{ &prosumer::producer_func, this }
, consumer_{ &prosumer::consumer_func, this }
{}
// Set the quit flag and wait for the threads to exit.
~prosumer()
{
quit_ = true;
producer_.join();
consumer_.join();
}
private:
// Thread entry point for the producer.
void producer_func()
{
// Value to add to the ringbuffer to simulate data.
int counter = 0;
while ( quit_ == false )
{
// Simulate the production of some data.
std::vector< int > produced_items;
const auto items_to_produce = std::rand() % circular_buffer_capacity;
for ( int i = 0; i < items_to_produce; ++i )
{
produced_items.push_back( ++counter );
}
// Get a lock on the circular buffer.
std::unique_lock< std::mutex > lock( buffer_lock_ );
// Wait for the buffer to be emptied or the quit flag to be set.
buffer_is_empty_.wait( lock, [this]()
{
return buffer_.empty() == true || quit_ != false;
} );
// Check if the thread was requested to quit.
if ( quit_ != false )
{
// Don't let the consumer deadlock.
buffer_has_data_.notify_one();
break;
}
// The buffer is locked by this thread. Put the data into it.
buffer_.insert( std::end( buffer_ ), std::begin( produced_items ), std::end( produced_items ) );
// Notify the consumer that the buffer has some data in it.
buffer_has_data_.notify_one();
}
std::cout << "producer thread quit\n";
}
// Thread entry for the consumer.
void consumer_func()
{
int counter_check = 0;
while ( quit_ == false )
{
std::unique_lock< std::mutex > lock( buffer_lock_ );
// Wait for the buffer to have some data before trying to read from it.
buffer_has_data_.wait( lock, [this]()
{
return buffer_.empty() == false || quit_ != false;
} );
// Check if the thread was requested to quit.
if ( quit_ != false )
{
// Don't let the producer deadlock.
buffer_is_empty_.notify_one();
break;
}
// The buffer is locked by this thread. Simulate consuming the data.
for ( auto i : buffer_ ) assert( i == ++counter_check );
buffer_.clear();
// Notify the producer thread that the buffer is empty.
buffer_is_empty_.notify_one();
}
std::cout << "consumer thread quit\n";
}
// How many items the circular buffer can hold.
static const int circular_buffer_capacity = 64;
// Flag set in the destructor to signal the threads to stop.
std::atomic_bool quit_;
// Circular buffer to hold items and a mutex for synchronization.
std::mutex buffer_lock_;
boost::circular_buffer< int > buffer_;
// Condition variables for the threads to signal each other.
std::condition_variable buffer_has_data_;
std::condition_variable buffer_is_empty_;
std::thread producer_;
std::thread consumer_;
};
int main( int argc, char **argv )
{
(void)argc; (void) argv;
prosumer test;
// Let the prosumer work for a little while.
std::this_thread::sleep_for( std::chrono::seconds( 3 ) );
return EXIT_SUCCESS;
}
If you look at the producer_func and consumer_func thread functions you can see that they loop until the quit variable is set by the prosumer destructor, but they also check for the quit variable again after they lock the circular buffer. If the quit variable was set, they signal each other to prevent a deadlock.
Another idea I had was to call notify_one() on the condition variables from the destructor, would that be a better solution?
Is there a better way to do this?
Update 1: I forgot to mention that in this instance, when the threads are requested to exit, the consumer does not need to consume any remaining data in the circular buffer and it's fine if the producer produces a little bit more too. As long as they both exit and don't deadlock all will be well.