2

So I was looking up how to do some parallelism just using stl c++ stuff and found the following bit of code on another question here in Stack Overflow

template <typename RAIter> //FOUND ON STACK OVERFLOW
int parallel_sum(RAIter beg, RAIter end)
{
    auto len = end - beg;
    if (len < 1000)
        return std::accumulate(beg, end, 0);

    RAIter mid = beg + len / 2;
    auto handle = std::async(std::launch::async,
        parallel_sum<RAIter>, mid, end);
    int sum = parallel_sum(beg, mid);
    return sum + handle.get();
}

I wanted to make a general parallel_for_each function that loops over a (hopefully) arbitrary container type and applies an algorithm to each entry so I modified the above to the following:

template <typename ContainerIterator, typename containerSizeType, typename AlgorithmPerEntry> //modified version of parallel sum code above : https://stackoverflow.com/questions/36246300/parallel-loops-in-c
void parallel_for_each(ContainerIterator beg, ContainerIterator end, AlgorithmPerEntry& algorithm, containerSizeType maxProbSize)
{   
    containerSizeType len = end - beg;
    if (len < maxProbSize){//if you are sufficiently small, go ahead and execute
        std::for_each(beg, end, algorithm);
        std::cout << "working on processor with id = " << GetCurrentProcessorNumber() << std::endl;//the processor id's change so I'm assuming this is executing in parallel
        return;
    }
    //otherwise, continue spawning more threads
    ContainerIterator mid = beg + len / 2;
    auto handle = std::async(std::launch::async,
        parallel_for_each<ContainerIterator, containerSizeType, AlgorithmPerEntry>, mid, end, algorithm, maxProbSize);
    parallel_for_each(beg, mid, algorithm, maxProbSize);
    handle.get(); //corrected as advised
}

I wanted to test is with a super simple functor so I made the following:

template<typename T>
struct dataSetter
{
    const T& set_to;
    dataSetter(const T& set_to_in) : set_to(set_to_in){}

    void operator()(T& set_this)
    {
        set_this = set_to;
    }
};

Pretty straight forward, just sets the value of some arg into its operator()

Here's my main function's body

std::vector<int> ints(100000);
unsigned minProbSize = 1000;
int setval = 7;

dataSetter<int> setter(setval);

parallel_for_each(ints.begin(), ints.end(), setter, minProbSize);//parallel assign everything to 7
//some sort of wait function to go here?
std::cout << std::endl << "PS sum of all ints = " << parallel_sum(ints.begin(), ints.end()) << std::endl; //parallel sum the entries

int total = 0;//serial sum the entries
for (unsigned i = 0; i < ints.size(); i++)
    total += ints[i];

std::cout << std::endl << "S sum of all ints = " << total << std::endl;
std::cout << std::endl << "PS sum of all ints = " << parallel_sum(ints.begin(), ints.end()) << std::endl; //parallel sum the entries again

Here are some outputs :

PS sum of all ints = 689052

S sum of all ints = 700000

PS sum of all ints = 700000

output from another run:

PS sum of all ints = 514024

S sum of all ints = 700000

PS sum of all ints = 700000

It consistently gets the first parallel sum over the vector low. My guess as to what is happening is that all the assignment threads get created, then the summing threads get created, but certain sum threads are executing prematurely (before the last assignment thread). Is there any way I can force a wait? And as always, I'm open to all advice.

trashpanda
  • 93
  • 1
  • 13
  • 1
    You aren't calling `handle.get()`. This means you aren't waiting for the async invocation to complete. When the "root" `parallel_for_each` call returns, some "spawned" asynchronous calls may still be running, assigning to the very same elements your `parallel_sum` is reading. Your program therefore exhibits undefined behavior by way of a data race. – Igor Tandetnik Sep 18 '17 at 01:02
  • 1
    Or rather, this was the behavior in C++11. In C++14, the destructor of `handle` would block until the async invocation completes. I guess you are using an older compiler that's conforming to C++11 but not C++14. – Igor Tandetnik Sep 18 '17 at 01:05
  • So I added `handle.get()` to occur right after the handle creation. My answer is correct, now (thank you). Do you know a good resource to read up on what exactly it does? My processor id prints are still coming up with different numbers so I'm assuming parallelism is occurring. I just want to make sure I'm not killing my parallelism by misplacing the `handle.get()` – trashpanda Sep 18 '17 at 01:10
  • 1
    `handle.get()` has a side effect of blocking until the asynchronous operation completes (its primary effect is to obtain the return value of said asynchronous operation, but yours doesn't return any). You want to call it after the synchronous call - basically, a) start async, b) do something else for a while, c) wait for (a). If you swap (b) and (c), you defeat the purpose. – Igor Tandetnik Sep 18 '17 at 01:14
  • I see. I made my `handle.get()` call occur at the end of my function (like the original example, duh) and now my output is looking more like parallel output (e.g. kinda jumbled). Thanks again! – trashpanda Sep 18 '17 at 01:22
  • 1
    @igor The problem isn't C++11 vs C++14; both state the destructor of the future returned from a async async shoukd block: the problem is MSVC shipped an intentionally non-compliant `std::async`. They fixed it in MSVS2015 I think. – Yakk - Adam Nevraumont Sep 18 '17 at 02:39
  • @Yakk [cppreference](http://en.cppreference.com/w/cpp/thread/future/~future) seems to disagree. I must admit I haven't checked the actual standard. – Igor Tandetnik Sep 18 '17 at 02:43
  • @igor https://stackoverflow.com/questions/23455104/why-is-the-destructor-of-a-future-returned-from-stdasync-blocking cppreference is in error. – Yakk - Adam Nevraumont Sep 18 '17 at 03:15
  • 1
    @Yakk I see. **[futures.async]/5** (the last bullet) is what apparently makes `~future` block for an `async` manufactured future. That's quite subtle - it takes some work to put all the pieces together. I guess it's a good thing that C++14 added a clarification. – Igor Tandetnik Sep 18 '17 at 12:47
  • With C++ 17, parallel `for_each` is simply `std::for_each(std::execution::par, first, last, pred)`. – Pete Becker Sep 18 '17 at 14:21

1 Answers1

1

MSVS 2013 shipped with a non-standard compliant std async. From what I heard this was intentional.

This non-compliant std async failed to return futures that block on task completion when invoked with an async launch policy.

The result is your code is correct, but your compiler is broken. Either upgrade to 2015/2017 or add a handle.get() just before it goes out of scope.


Personally I would write such a utility a bit differently.

auto launch_async = [](auto&& f){
  return std::async( std::launch::async, decltype(f)(f) );
};
template <class Linear, class Executor=decltype(launch_async) const&>
auto parallel_algo(Linear&& linear, std::size_t chunk_size, Executor&& exec=launch_async){
  return 
    [
      linear=std::forward<Linear>(linear),
      chunk_size,
      exec=std::forward<Executor>(exec)
    ]
    ( auto start, auto finish )
    {
    std::size_t count = finish-start;
    if (count <= chunk_size) {
      linear( start, finish);
      return;
    }
    std::size_t par = (count+chunk_size-1)/chunk_size;
    std::vector<std::future<void>> tasks( par-1 );
    auto task=[&]( auto i ){
      auto b = start+( count*i/par );
      auto e = start+( count*(i+1)/par );
      return [b,e,linear]{ linear(b,e); };
    };
    for(auto& f:tasks){
      auto i = &f-tasks.data();
      f = exec( task(i) );
    }
    task(par-1)();
    for (auto&f:tasks) f.get();
  };
}
template<class F>
auto foreacher( F&&f ){
    return [f=std::forward<F>(f)]( auto b, auto e ){
      for (auto i=b; i!=e; ++i) f(*i);
    };
}

which is C++14 but can be emulated in C++11.

What this does is take a linear algorithm and a max chunk size, return a parallel algorithm.

In both cases, algorithms take an iterator range.

While I was in there, the parallel algorithm factory takes an executor. You can write an exectuor, for example a thread pool, to avoid generating too many threads needlessly. And single element algorithms use foreacher to lift themselves to range algorithms.

Some toy executors:

auto launch_deferred = [](auto&& f){
  return std:async( std::launch::deferred, decltype(f)(f) );
};
template<class F, class R=std::result_of_t< decltype(f)() >>
std::enable_if_t< !std::is_same<R,void>, std::future<R> >
make_ready_future( F&& f ) {
  std::promise<R> p;
  try {
    p.set_value( decltype(f)(f)() );
  } catch( ... ) {
    p.set_exception(std::current_exception());
  }
  return p.get_future();
}
template<class F, class R=std::result_of_t< decltype(f)() >>
std::enable_if_t< std::is_same<R,void>, std::future<R> >
make_ready_future( F&& f ) {
  std::promise<void> p;
  try {
    decltype(f)(f)();
    p.set_value();
  } catch( ... ) {
    p.set_exception(std::current_exception());
  }
  return p.get_future();
}

auto launch_ready = [](auto&& f){
  return make_ready_future( decltype(f)(f) );
};

Both of these make your parallel code run in a single thread.

A fancier one queues tasks in a thread pool and similarly returns futures.

Here is test code:

std::vector<int> v(10000);
parallel_algo( foreacher([](auto&x){x=7;}), 100 )( v.begin(), v.end() );
std::atomic<int> total(0);
parallel_algo( foreacher([&total](auto&x){total+=x;}), 100 )( v.begin(), v.end() );
std::cout << total << "\n";
Yakk - Adam Nevraumont
  • 262,606
  • 27
  • 330
  • 524