0
void runner(int id, int a, int b) {...}

void caller()
{
  static ctpl::thread_pool pool(4);
  for (int i = 0; i < nbThreads; i++) {
    pool.push(runner, a, b);
  }
  pool.stop(true); // this kills all threads, meaning next time this function is called i have to create them again
}

So I'm using this library here: https://github.com/vit-vit/ctpl

However, in the example, I did not see any function to synchronize the threads. For std::thread, that would be th.join(). pool.get_thread(i).join() is not the solution; it freezes the because the thread is running always running to accept new commands, which is its intended behavior.

The only way I see that work is pool.stop(true). But if I do this, the threads are destroyed and I have to create them again next time, which defeats the point of a thread pool.

Can someone who use this library show me how to synchronize threads?

Huy Le
  • 1,439
  • 4
  • 19
  • have you read through this post: https://stackoverflow.com/questions/15752659/thread-pooling-in-c11 basically -- There is a difference between a "thread pool" and a collection of threads. I am thinking you want the latter. From the link: `A pool of threads means that all your threads are running, all the time – in other words, the thread function never returns.` – Andy Aug 01 '20 at 06:09
  • with the latter, i have to create new threads each time a function is called, which cost time. I'm looking for a way so that threads only need to be created once, then reuse – Huy Le Aug 01 '20 at 07:21
  • 2
    ??? why are you trying to stop/join the pool if you need it later? Just create it once, at program initialization, and then.....don't stop it! – Martin James Aug 02 '20 at 06:51

2 Answers2

1

It seems as though you misunderstand the usefulness of CTPL.

From CTPL README.md:

... The jobs come to the pool dynamically. A job is fetched and deleted from the container when there is an idle thread. The job is then run on that thread.

A thread pool is helpful when you want to minimize time of loading and destroying threads and when you want to limit the number of parallel jobs that run simultaneously ...

If you want to run the thread pool once and then return the function once all the threads are done simply do the following:

void runner(int id, int a, int b) {...}

void caller()
{
  int nbThreads = 4;
  ctpl::thread_pool pool(4);
  for (int i = 0; i < nbThreads; i++) {
    pool.push(runner, a, b);
  }
}

The caller function will not return until the thread_pool's threads have finished. This effectively synchronizes the threads.


The benefit of CTPL's thread_pool is you can push a job to the thread pool without worrying about the status of the other threads. e.g.

void runner(int id, int a, int b) {...}
void runner2(int id) {...}

// I would wrap this in it's own class...
static ctpl::thread_pool pool(4);

void caller()
{
  int nbThreads = 25;
  for (int i = 0; i < nbThreads; i++) {
    pool.push(runner, a, b);
  }
}

// caller and caller2 called at different times in main thread.
void caller2(int jobs)
{
   for (int i = 0; i < jobs; i++) {
     pool.push(runner2);
   }
}

This example should synchronize all threads before the process ends.


Based on the answer you provided yourself, the functionality you desire seems to differ from what the library provides. The biggest issue is that if you're waiting in a for-loop you're wasting time. In your comment, you say that creating new threads every time you call caller would waste time. In your answer, what if results[0] takes ten minutes to run but rest of the results finished in seconds?

If you truly want to run N jobs and wait for them to be done while using the CTPL library. I recommend altering your answer a bit to at catch any exceptions that might have been thrown during the jobs.

void runner(int id, int a, int b) {...}

void caller()
{
  int jobs = 35;
  static ctpl::thread_pool pool(4);
  std::vector<std::future<void>> job_results;
  for (int i = 0; i < jobs; i++) {
    job_results.push_back(pool.push(runner, a, b));
  }
  for (std::future<void> result : job_results) {
    try {
      results.get();
    } catch (const std::exception& e) {
      ... // handle exception.
    }
  }
}

However, you would be best off just using a static vector of std::threads for the functionality you want. That way you don't have any 3rd party dependencies.

nwfistere
  • 355
  • 4
  • 15
0

Use std::future to get the result of each thread. Then use wait() on std::future objects.

std::future<void> results[nbThreads];
for (int i=0; i<nbThreads; i++) results[i] = pool.push(runner, a, b);
for (int i=0; i<nbThreads; i++) results[i].wait(); // synchronize all threads
Huy Le
  • 1,439
  • 4
  • 19