I have been trying to figure out std::condition_variables and I am particularly confused by wait()
and whether to use notify_all
or notify_one
.
First, I've written some code and attached it below. Here's a short explanation: Collection
is a class that holds onto a bunch of Counter
objects. These Counter
objects have a Counter::increment()
method, which needs to be called on all the objects, over and over again. To speed everything up, Collection
also maintains a thread pool to distribute the work over, and sends out all the work with its Collection::increment_all()
method.
These threads don't need to communicate with each other, and there are usually many more Counter
objects than there are threads. It's fine if one thread processes more than Counter
s than others, just as long as all the work gets done. Adding work to the queue is easy and only needs to be done in the "main" thread. As far as I can see, the only bad thing that can happen is if other methods (e.g. Collection::printCounts
) are allowed to be called on the counters in the middle of the work being done.
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
class Counter{
private:
int m_count;
public:
Counter() : m_count(0) {}
void increment() {
m_count ++;
}
int getCount() const { return m_count; }
};
class Collection{
public:
Collection(unsigned num_threads, unsigned num_counters)
: m_shutdown(false)
{
// start workers
for(size_t i = 0; i < num_threads; ++i){
m_threads.push_back(std::thread(&Collection::work, this));
}
// intsntiate counters
for(size_t j = 0; j < num_counters; ++j){
m_counters.emplace_back();
}
}
~Collection()
{
m_shutdown = true;
for(auto& t : m_threads){
if(t.joinable()){
t.join();
}
}
}
void printCounts() {
// wait for work to be done
std::unique_lock<std::mutex> lk(m_mtx);
m_work_complete.wait(lk); // q2: do I need a while lop?
// print all current counters
for(const auto& cntr : m_counters){
std::cout << cntr.getCount() << ", ";
}
std::cout << "\n";
}
void increment_all()
{
std::unique_lock<std::mutex> lock(m_mtx);
m_work_complete.wait(lock);
for(size_t i = 0; i < m_counters.size(); ++i){
m_which_counters_have_work.push(i);
}
}
private:
void work()
{
while(!m_shutdown){
bool action = false;
unsigned which_counter;
{
std::unique_lock<std::mutex> lock(m_mtx);
if(m_which_counters_have_work.size()){
which_counter = m_which_counters_have_work.front();
m_which_counters_have_work.pop();
action = true;
}else{
m_work_complete.notify_one(); // q1: notify_all
}
}
if(action){
m_counters[which_counter].increment();
}
}
}
std::vector<Counter> m_counters;
std::vector<std::thread> m_threads;
std::condition_variable m_work_complete;
std::mutex m_mtx;
std::queue<unsigned> m_which_counters_have_work;
bool m_shutdown;
};
int main() {
int num_threads = std::thread::hardware_concurrency()-1;
int num_counters = 10;
Collection myCollection(num_threads, num_counters);
myCollection.printCounts();
myCollection.increment_all();
myCollection.printCounts();
myCollection.increment_all();
myCollection.printCounts();
return 0;
}
I compile this on Ubuntu 18.04 with g++ -std=c++17 -pthread thread_pool.cpp -o tp && ./tp
I think the code accomplishes all of those objectives, but a few questions remain:
I am using
m_work_complete.wait(lk)
to make sure the work is finished before I start printing all the new counts. Why do I sometimes see this written inside awhile
loop, or with a second argument as a lambda predicate function? These docs mention spurious wake ups. If a spurious wake up occurs, does that meanprintCounts
could prematurely print? If so, I don't want that. I just want to ensure the work queue is empty before I start using the numbers that should be there.I am using
m_work_complete.notify_all
instead ofm_work_complete.notify_one
. I've read this thread, and I don't think it matters--only the main thread is going to be blocked by this. Is it faster to usenotify_one
just so the other threads don't have to worry about it?