1

I am trying to create a sort of threadpool that runs functions on separate threads and only starts a new iteration when all functions have finished.

map<size_t, bool> status_map;
vector<thread> threads;
condition_variable cond;

bool are_all_ready() {
  mutex m;
  unique_lock<mutex> lock(m);
  for (const auto& [_, status] : status_map) {
    if (!status) {
      return false;
    }
  }
  return true;
}

void do_little_work(size_t id) {
  this_thread::sleep_for(chrono::seconds(1));
  cout << id << " did little work..." << endl;
}

void do_some_work(size_t id) {
  this_thread::sleep_for(chrono::seconds(2));
  cout << id << " did some work..." << endl;
}

void do_much_work(size_t id) {
  this_thread::sleep_for(chrono::seconds(4));
  cout << id << " did much work..." << endl;
}

void run(const function<void(size_t)>& function, size_t id) {
  while (true) {
    mutex m;
    unique_lock<mutex> lock(m);

    cond.wait(lock, are_all_ready);

    status_map[id] = false;
    cond.notify_all();

    function(id);

    status_map[id] = true;
    cond.notify_all();
  }
}
 
int main() {
  threads.push_back(thread(run, do_little_work, 0));
  threads.push_back(thread(run, do_some_work, 1));
  threads.push_back(thread(run, do_much_work, 2));

  for (auto& thread : threads) {
    thread.join();
  }

  return EXIT_SUCCESS;
}

I expect to get the output:

0 did little work...
1 did some work...
2 did much work...
0 did little work...
1 did some work...
2 did much work...
        .
        .
        .

after the respective timeouts but when I run the program I only get

0 did little work...
0 did little work...
        .
        .
        .

I also have to say that Im rather new to multithreading but in my understanding, the condition_variable should to the taks of blocking every thread till the predicate returns true. And in my case are_all_ready should return true after all functions have returned.

Symlink
  • 383
  • 2
  • 12
  • Could you tidy up your example into a [minimal, reproducible example](https://stackoverflow.com/help/minimal-reproducible-example)? At the very least you're missing includes, a `using` directive, and you've got lots of distracting code commented out. – pilcrow Mar 25 '21 at 21:16

2 Answers2

1

As-is, your program has a crash (UB) due to concurrent access to status_map.

When you do:

void run(const function<void(size_t)>& function, size_t id)
{
...
    mutex m;
    unique_lock<mutex> lock(m);
...
    status_map[id] = false;

the locks created are local variables, one per thread, and as such independent. So, it doesn't prevent multiple threads from writing to status_map at once, and thus crashing. That's what I get on my machine.

Now, if you make the mutex static, only one thread can access the map at once. But that also makes it so that only one thread runs at once. With this I see 0, 1 and 2 running, but only once at a time and a strong tendency for the previous thread to have run to run again.

My suggestion, go back to the drawing board and make it simpler. All threads run at once, single mutex to protect the map, only lock the mutex to access the map, and ... well, in fact, I don't even see the need for a condition variable.

e.g. what is wrong with:

#include <thread>
#include <iostream>
#include <vector>

using namespace std;

vector<thread> threads;

void do_little_work(size_t id) {
  this_thread::sleep_for(chrono::seconds(1));
  cout << id << " did little work..." << endl;
}

void do_some_work(size_t id) {
  this_thread::sleep_for(chrono::seconds(2));
  cout << id << " did some work..." << endl;
}

void do_much_work(size_t id) {
  this_thread::sleep_for(chrono::seconds(4));
  cout << id << " did much work..." << endl;
}

void run(const function<void(size_t)>& function, size_t id) {
  while (true) {
    function(id);
  }
}

int main() {
  threads.push_back(thread(run, do_little_work, 0));
  threads.push_back(thread(run, do_some_work, 1));
  threads.push_back(thread(run, do_much_work, 2));

  for (auto& thread : threads) {
    thread.join();
  }

  return EXIT_SUCCESS;
}
Jeffrey
  • 11,063
  • 1
  • 21
  • 42
  • So do I have to create one mutex and then create all locks with this mutex or do i have to create one lock in general? – Symlink Mar 24 '21 at 15:36
  • I don't think that design simplification will work. The OP wants the worker threads to process in batches, pausing at a "cyclic barrier" (or "countdown latch") after each has done one unit of work. – pilcrow Mar 24 '21 at 16:17
  • I had missed the batch part. In that case, I think `std::counting_semaphore` would be better than a `condition_variable`, but in any case the current structure doesn't hold. It needs shared mutices to protect shared resources, and counting, where counting is needed. – Jeffrey Mar 24 '21 at 17:32
1

There are several ways to do this.

Easiest in my opinion would be a C++20 std::barrier, which says, "wait until all of N threads have arrived and are waiting here."

#include <barrier>

std::barrier synch_workers(3);
....
void run(const std::function<void(size_t)>& func, size_t id) {
  while (true) {
    synch_workers.arrive_and_wait(); // wait for all three to be ready
    func(id);
  }
}

Cruder and less efficient, but equally effective, would be to construct and join() new sets of three worker threads for each "batch" of work:

int main(...) {
  std::vector<thread> threads;
  ...
  while (flag_running) {
    threads.push_back(...);
    threads.push_back(...);
    ...
    for (auto& thread : threads) {
      thread.join();
    }
    threads.clear();
  }

Aside

I'd suggest you revisit some core synchronization concepts, however. You are using new mutexes when you want to re-use a shared one. The scope of your unique_lock isn't quite right.

Now, your idea to track worker thread "busy/idle" state in a map is straightforward, but cannot correctly coordinate "batches" or "rounds" of work that must be begun at the same time.

If a worker sees in the map that two of three threads, including itself, are "idle", what does that mean? Is a "batch" of work concluding — i.e., two workers are waiting for a tardy third? Or has a batch just begun — i.e., the two idle threads are tardy and had better get to work like their more eager peer?

The threads cannot know the answer without keeping track of the current batch of work, which is what a barrier (or its more complex cousin the phaser) does under the hood.

pilcrow
  • 56,591
  • 13
  • 94
  • 135