3

I am trying to distribute a loop into multiple threads, evaluate and collect the aggregated results in the main thread using C++ promises and futures as below:

std::vector<std::promise<std::tuple<uint64_t, uint64_t, uint64_t>>> promiseVector;
std::vector<std::future<std::tuple<uint64_t, uint64_t, uint64_t>>> futureVector;;
std::vector<std::thread *> threadPool;
                
// create threads
for(int ti = 0, initialval = 0; ti<max_num_threads; ti++, initialval += size_per_thread) {
    // create promise
    promiseVector.push_back(new std::promise<std::tuple<uint64_t, uint64_t, uint64_t>>(std::make_tuple(0, 0, 0)));
    
    // create futures
    futureVector.push_back(promiseVector[ti].get_future());
    
    // create thread and start
    threadPool.push_back(
        new std::thread([&]{
            uint64_t rv1 = 0, rv2 = 0, rv3 = 0;
            
            // some threadwise work
            for (int ri = initialval, i = 0; i < size_per_thread; ri += 8, i++) {
                ...
                ...
                ...
                // update 
                rv1 = ...
                rv2 = ...
                rv3 = ...
            }
            
            promiseVector[ti].set_value(std::make_tuple(local_true_positive, local_false_positive, local_false_negative));
        })
    );
}

// accumulate results from all threads
for(int ti = 0; ti<max_num_threads; ti++) {
    std::tuple<uint64_t, uint64_t, uint64_t> fval = futureVector[ti].get();
    global_val1 += std::get<0>(fval);
    global_val2 += std::get<1>(fval);
    global_val3 += std::get<2>(fval);           
}

// join all threads
for(int ti = 0; ti<max_num_threads; ti++) {
    threadPool[ti]->join();
}

But it fails to compile with below error:

 error: no matching constructor for initialization of 'std::promise<std::tuple<uint64_t, uint64_t, uint64_t> >' (aka 'promise<tuple<unsigned long, unsigned long, unsigned long> >')
                        promiseVector.push_back(new std::promise<std::tuple<uint64_t, uint64_t, uint64_t>>(std::make_tuple(0, 0, 0)));
                                                    ^                                                      ~~~~~~~~~~~~~~~~~~~~~~~~

I am new to cpp multithreading.

halfer
  • 19,824
  • 17
  • 99
  • 186
user3243499
  • 2,953
  • 6
  • 33
  • 75

1 Answers1

5

Do not use C++ new as Java/C# new. C++ has working value semantics for all types and you should use it by default. On the other hand, new should only be used if you need to manually control the lifetime of objects and in that case strongly prefer smart pointers.

C++ has using keyword to give complicated types new names and auto to automatically deduce them.

This is how I would write such code:

#include <vector>
#include <thread>
#include <future>
#include <tuple>

using result_t = std::tuple<uint64_t, uint64_t, uint64_t>;
std::vector<std::future<result_t>> futureResults;
//Store values directly
std::vector<std::thread> threadPool;

int main()
{
// create threads
constexpr int max_num_threads = 8;
constexpr int size_per_thread = 1;

int global_val1=0,global_val2=0,global_val3=0;

// Move around, if you need to capture something.
auto worker_fnc = [](std::promise<result_t> promise){
            uint64_t rv1 = 0, rv2 = 0, rv3 = 0;
            //Work
            promise.set_value(std::make_tuple(0,0,0));
        };

for(int ti = 0, initialval = 0; ti<max_num_threads; ti++, initialval += size_per_thread) {
    // No need for pointers. Promise cannot be initialized.
    std::promise<result_t> promise;

    futureResults.push_back(promise.get_future());
    
    // Create the thread, pass it its promise.
    threadPool.push_back(std::thread(worker_fnc,std::move(promise)));
}

// accumulate results from all threads
for(int ti = 0; ti<max_num_threads; ti++) {
    auto fval = futureResults[ti].get();
    global_val1 += std::get<0>(fval);
    global_val2 += std::get<1>(fval);
    global_val3 += std::get<2>(fval);           
}

// join all threads
for(int ti = 0; ti<max_num_threads; ti++) {
    threadPool[ti].join();
}
}    

Couple of notes/advices:

  • Promises cannot be initialized, that is the opposite of how you want to use them, they are a placeholder for a result that does not exist yet. This is what is the compiler complaining about.
  • Spend time on understanding move semantics, you can start here. It is heavily used in both yours and mine code and if you try to store some expressions into variables, the code will behave differently/not compile.
  • Overall, understanding when and how C++ copies and moves values around is crucial in writing correct code, especially in multithreading. Java/C# reference semantics must explicitly be requested by using pointers or references for any type.
  • Understand why promise variable must be explicitly moved while std::future, std::thread expresssions are moved by default. All types cannot be copied.
  • There is no need to hold promises globally - only the writer should have access to the promise and only the reader should known the future.
  • [&] construct is dangerous, especially in the context of threads since they will most likely outlive any local variables. E.g. capturing the local promise variable instead of passing the value around would lead to undefined behaviour. Another gotcha would be capturing loop variables. Explicitly specify captured references, consider copy for small types. You can even rename the variables using [&new_name_ref=old_name,new_name_copy=old_name2].
  • Look into std::async, it handles threads, promises, and futures for you. Useful for launching simple tasks, not so useful when you need thread-pools.
Quimby
  • 17,735
  • 4
  • 35
  • 55