0

With the new standards ofc++17 I wonder if there is a good way to start a process with a fixed number of threads until a batch of jobs are finished.

Can you tell me how I can achieve the desired functionality of this code:

std::vector<std::future<std::string>> futureStore;
const int batchSize             = 1000;
const int maxNumParallelThreads = 10;
int threadsTerminated           = 0;

while(threadsTerminated < batchSize)
{
    const int& threadsRunning = futureStore.size();
    while(threadsRunning < maxNumParallelThreads)
    {
        futureStore.emplace_back(std::async(someFunction));
    }
    for(std::future<std::string>& readyFuture: std::when_any(futureStore.begin(), futureStore.end()))
    {
        auto retVal = readyFuture.get(); 
        // (possibly do something with the ret val)
        threadsTerminated++;
    }
} 

I read, that there used to be an std::when_any function, but it was a feature that did make it getting into the std features.

Is there any support for this functionality (not necessarily for std::future-s) in the current standard libraries? Is there a way to easily implement it, or do I have to resolve to something like this?

Adam Hunyadi
  • 1,890
  • 16
  • 32

2 Answers2

2

This does not seem to me to be the ideal approach:

  1. All your main thread does is waiting for your other threads finishing, polling the results of your future. Almost wasting this thread somehow...

  2. I don't know in how far std::async re-uses the threads' infrastructures in any suitable way, so you risk creating entirely new threads each time... (apart from that you might not create any threads at all, see here, if you do not specify std::launch::async explicitly.

I personally would prefer another approach:

  1. Create all the threads you want to use at once.
  2. Let each thread run a loop, repeatedly calling someFunction(), until you have reached the number of desired tasks.

The implementation might look similar to this example:

const int BatchSize = 20;
int tasksStarted = 0;
std::mutex mutex;
std::vector<std::string> results;

std::string someFunction()
{
    puts("worker started"); fflush(stdout);
    sleep(2);
    puts("worker done"); fflush(stdout);
    return "";
}

void runner()
{
    {
        std::lock_guard<std::mutex> lk(mutex);
        if(tasksStarted >= BatchSize)
            return;
        ++tasksStarted;
    }
    for(;;)
    {
        std::string s = someFunction();
        {
            std::lock_guard<std::mutex> lk(mutex);
            results.push_back(s);
            if(tasksStarted >= BatchSize)
                break;
            ++tasksStarted;
        }
    }
}

int main(int argc, char* argv[])
{
    const int MaxNumParallelThreads = 4;

    std::thread threads[MaxNumParallelThreads - 1]; // main thread is one, too!
    for(int i = 0; i < MaxNumParallelThreads - 1; ++i)
    {
        threads[i] = std::thread(&runner);
    }
    runner();

    for(int i = 0; i < MaxNumParallelThreads - 1; ++i)
    {
        threads[i].join();
    }

    // use results...

    return 0;
}

This way, you do not recreate each thread newly, but just continue until all tasks are done.

If these tasks are not all all alike as in above example, you might create a base class Task with a pure virtual function (e. g. "execute" or "operator ()") and create subclasses with the implementation required (and holding any necessary data).

You could then place the instances into a std::vector or std::list (well, we won't iterate, list might be appropriate here...) as pointers (otherwise, you get type erasure!) and let each thread remove one of the tasks when it has finished its previous one (do not forget to protect against race conditions!) and execute it. As soon as no more tasks are left, return...

Aconcagua
  • 24,880
  • 4
  • 34
  • 59
  • The default behaviour of std::async is `std::launch | std::deferred` and will most likely use some sort of threadpool. It is true, that there is no guarantee, that new threads will be spawned, but that is intentional, because thread creation might not be possible. In this case, the code will still execute correctly, instead of throwing an exception. After all i think, the std::async solution is way more elegant. – OutOfBound Jun 13 '17 at 09:17
  • Perhaps a matter of taste. In this specific case, I don't consider it more elegant for one specific reason: There is no condition variable available to be signalled from multiple threads waking up a single one (as discussed [here](https://stackoverflow.com/questions/27218590/stdcondition-variable-wait-for-several-threads-to-notify-observer))... So you need to fall back into polling in the main thread... – Aconcagua Jun 13 '17 at 09:21
  • Wouldn't this wait for the *first launched* thread instead of any that is finished? I have threads that have very inpredictable execution times, and this would wait for the slowest of them. Polling the threads is even more effective than doing this. – Adam Hunyadi Jun 13 '17 at 09:47
  • @AdamHunyadi What exactly do you mean by 'this'? Joining? That will only occur when no tasks are left any more and makes sure that we do not proceed until *all* tasks are finished. Or the condition variable I mentioned? Well, problem is: *there isn't any* suitable for our use case! We have one allowing a single thread waking up multiple others, but none allowing any arbitrary thread waking up a specific one... (at least not in standard c++). – Aconcagua Jun 13 '17 at 09:56
  • @Aconcagua Say I have got a job for the threads that either takes 10 ms to compute, or I'll let it timeout after about 1000 miliseconds, and return with an object suggesting that no answer was found. By having a single one of such a threads when I'm joining the threads in a fixed order (ie. not the ones that are finished first), I'm losing most of the performance boost I get by having the threads, if there is one of the threads in the thread store, that take long to execute. – Adam Hunyadi Jun 14 '17 at 13:28
  • I think what you would do with your example is put it into a for cycle inside the main() and run until the batch is don. If I interpreted your code incorrectly, please by all means tell me. – Adam Hunyadi Jun 14 '17 at 13:37
  • @AdamHunyadi Have a closer look at the runner function! Idea is as follows: We start a fix number of threads and have some predefined number of tasks (counter or list or...). Each thread runs within a loop and takes a new (unstarted!) task as soon as it has processed its current one. So there won't be any interruption for any thread as long as there are tasks left (including the main task, as it calls runner, too). Only if there is no unstarted task left, the threads will return from processing (including the main thread). – Aconcagua Jun 14 '17 at 15:48
  • @AdamHunyadi There woudn't have been any issue if I had detached all the threads - *except* for if the main task completes its current task before any one of the worker threads, it might try to evaluate results not yet provided by the unfinished workers. This is the reason why main has to wait for *all* the workers to complete. So we simply join all threads in any arbitrary order, as we need to wait for all of them anyway. Delay for not joining immediately as soon as a thread has finished, but possibly after others yet running, should be negligible. – Aconcagua Jun 14 '17 at 15:48
  • @AdamHunyadi If you prove me wrong with this, then you might try to really detach the threads and maintain a counter for all yet running threds. As soon as a thread discovers that no more tasks are left, it decreases this counter and the last one (`--counter == 0`) would then notify a condition variable main is waiting for (instead of joining). – Aconcagua Jun 14 '17 at 15:48
  • Well, how would you process your whole batch with this approach? In you example you start ```MaxNumParallelThreads``` threads, and wait for all of them, are you not? – Adam Hunyadi Jun 15 '17 at 07:42
  • Easy: each thread started does not process one single task, but a whole bunch of them! As all process the same task *again and again*, each thread will (roughly) execute `batchSize/maxNumParallelThreads` tasks. – Aconcagua Jun 15 '17 at 08:34
1

If you dont care about the exact number of threads, the simplest solution would be:

std::vector<std::future<std::string>> futureStore(
    batchSize
);

std::generate(futureStore.begin(), futureStore.end(), [](){return std::async(someTask);});


for(auto& future : futureStore) {
    std::string value = future.get();
    doWork(value);
}

From my experience, std::async will reuse the threads, after a certain amount of threads is spawend. It will not spawn 1000 threads. Also, you will not gain much of a performance boost (if any), when using a threadpool. I did measurements in the past, and the overall runtime was nearly identical.

The only reason, I use threadpools now, is to avoid the delay for creating threads in the computation loop. If you have timing constraints, you may miss deadlines, when using std::async for the first time, since it will create the threads on the first calls.

There is a good thread pool library for these applications. Have a look here: https://github.com/vit-vit/ctpl

#include <ctpl.h>

const unsigned int numberOfThreads = 10;
const unsigned int batchSize = 1000;

ctpl::thread_pool pool(batchSize /* two threads in the pool */);
std::vector<std::future<std::string>> futureStore(
    batchSize
);

std::generate(futureStore.begin(), futureStore.end(), [](){ return pool.push(someTask);});

for(auto& future : futureStore) {
    std::string value = future.get();
    doWork(value);
}
OutOfBound
  • 1,914
  • 14
  • 31
  • Yes, as the title of the question says, I want to maximize the number of threads. I am working on a supercomputer with many-many cores, but I'm not the only user, so I should have a limit on the threads I spawn. – Adam Hunyadi Jun 13 '17 at 09:40
  • If you want to maximize the number of threads for performance reasons, than you don't need to care about it too much, since it will not become faster of you have more threads, than hardware threads, and std::async will spawn threads up to this point. If you want more threads for other reasons, than cptl is the way to go. I will add an example for you. – OutOfBound Jun 13 '17 at 09:45
  • I don't want to utilize all the cores, since I am not the only user of the supercomputer I'm working on. I need to share the resources with other users. – Adam Hunyadi Jun 13 '17 at 09:48
  • On most supercomputers, you have a job submission system, that will restrict the amount of ressources, you can use. If you want to limit your cores, then you will do this in the submission of you job, and you programm will be forced, to only use that many cores. – OutOfBound Jun 13 '17 at 09:53
  • @NicolBolas you are right. I should have used std::generate instead. I will fix the errer as soon as possible – OutOfBound Jun 13 '17 at 16:49
  • @NicolBolas Fixed i and did a test run. – OutOfBound Jun 14 '17 at 07:19