74

I wonder if there is a light, straight forward way to have loops such as for and range based-for loops compute in parallel in C++. How would you implement such a thing? From Scala I know the map, filter and foreach functions and maybe it would also be possible to perform these in parallel? Is there an easy way to achieve this in C++?

My primary platform is Linux, but it would be nice if it worked cross-platform.

Alexis Wilke
  • 19,179
  • 10
  • 84
  • 156
Exagon
  • 4,798
  • 6
  • 25
  • 53

8 Answers8

106

With the parallel algorithms in C++17 we can now use:

std::vector<std::string> foo;
std::for_each(
    std::execution::par,
    foo.begin(),
    foo.end(),
    [](auto&& item)
    {
        //do stuff with item
    });

to compute loops in parallel. The first parameter specifies the execution policy

Pavel P
  • 15,789
  • 11
  • 79
  • 128
Exagon
  • 4,798
  • 6
  • 25
  • 53
29

What is your platform? You can look at OpenMP, though it's not a part of C++. But it is widely supported by compilers.

As for range-based for loops, see, e.g., Using OpenMP with C++11 range-based for loops?.

I've also seen few documents at http://www.open-std.org that indicate some efforts to incorporate parallel constructs/algorithms into future C++, but don't know what's their current status.

UPDATE

Just adding some exemplary code:

template <typename RAIter>
void loop_in_parallel(RAIter first, RAIter last) {
   const size_t n = std::distance(first, last);

   #pragma omp parallel for
   for (size_t i = 0; i < n; i++) {
       auto& elem = *(first + i);
       // do whatever you want with elem
    }
}

The number of threads can be set at runtime via the OMP_NUM_THREADS environment variable.

Community
  • 1
  • 1
Daniel Langr
  • 22,196
  • 3
  • 50
  • 93
  • Lets say I have a not so expensive operation in the loop is it possible to splitt the loop in half? On thread does one half the other the rest? Same with 3 and so on threads? – Exagon Mar 27 '16 at 10:45
  • What are you iterating over? Can you use indexes for looping? – Daniel Langr Mar 27 '16 at 10:48
  • @Exagon It depends on you that in what way you are taking work from threads. You can make conditions in loops by which you can divide the work. – Hamza Anis Mar 27 '16 at 10:49
  • Yes I can would be nice if it works with range based to but I can also access the data via indexes – Exagon Mar 27 '16 at 10:50
  • 1
    With `RandomAccessIterator`s you can work with offsets within the loop and then access elements as `*(container.begin()+i)`. – Daniel Langr Mar 27 '16 at 10:51
  • Is OpenMP useable on Android/iOS ? – Lothar May 11 '23 at 15:47
22

With C++11 you can parallelize a for loop with only a few lines of code.

My function parallel_for() (define later in the post) splits a for loop into smaller chunks (sub loops), and each chunk assigned to a thread. Here is the usage:

/// Say you want to parallelize this:
for(int i = 0; i < nb_elements; ++i)
    computation(i);    

/// Then you would do:
parallel_for(nb_elements, [&](int start, int end){ 
    for(int i = start; i < end; ++i)
        computation(i); 
});

My parallel_for() also works within a class:

struct My_obj {

    /// Replacing:
    void sequential_for(){
        for(int i = 0; i < nb_elements; ++i)
            computation(i);
    }

    /// By:
    void process_chunk(int start, int end)
    {
        for(int i = start; i < end; ++i)
            computation(i);
    }

    void threaded_for(){
        parallel_for(nb_elements, [this](int s, int e){ 
            this->process_chunk(s, e); 
        } );
    }

    
};

Finally here is the implementation of parallel_for(), just paste in a header file and use it at will:

#include <algorithm>
#include <thread>
#include <functional>
#include <vector>

/// @param[in] nb_elements : size of your for loop
/// @param[in] functor(start, end) :
/// your function processing a sub chunk of the for loop.
/// "start" is the first index to process (included) until the index "end"
/// (excluded)
/// @code
///     for(int i = start; i < end; ++i)
///         computation(i);
/// @endcode
/// @param use_threads : enable / disable threads.
///
///
static
void parallel_for(unsigned nb_elements,
                  std::function<void (int start, int end)> functor,
                  bool use_threads = true)
{
    // -------
    unsigned nb_threads_hint = std::thread::hardware_concurrency();
    unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);

    unsigned batch_size = nb_elements / nb_threads;
    unsigned batch_remainder = nb_elements % nb_threads;

    std::vector< std::thread > my_threads(nb_threads);

    if( use_threads )
    {
        // Multithread execution
        for(unsigned i = 0; i < nb_threads; ++i)
        {
            int start = i * batch_size;
            my_threads[i] = std::thread(functor, start, start+batch_size);
        }
    }
    else
    {
        // Single thread execution (for easy debugging)
        for(unsigned i = 0; i < nb_threads; ++i){
            int start = i * batch_size;
            functor( start, start+batch_size );
        }
    }

    // Deform the elements left
    int start = nb_threads * batch_size;
    functor( start, start+batch_remainder);

    // Wait for the other thread to finish their task
    if( use_threads )
        std::for_each(my_threads.begin(), my_threads.end(), std::mem_fn(&std::thread::join));
}

Lastly you can define macros to get even more compact expression:

#define PARALLEL_FOR_BEGIN(nb_elements) parallel_for(nb_elements, [&](int start, int end){ for(int i = start; i < end; ++i)
#define PARALLEL_FOR_END()})

Now converting a sequential for:

for(int i = 0; i < nb_elements; ++i)
    computation(i);

Is only a matter of doing:

PARALLEL_FOR_BEGIN(nb_edges)
{
    computation(i);
}PARALLEL_FOR_END();
starball
  • 20,030
  • 7
  • 43
  • 238
arkan
  • 610
  • 6
  • 13
20

std::async may be a good fit here, if you are happy to let the C++ runtime control the parallelism.

Example from the cppreference.com:

#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>

template <typename RAIter>
int parallel_sum(RAIter beg, RAIter end)
{
    auto len = end - beg;
    if(len < 1000)
        return std::accumulate(beg, end, 0);

    RAIter mid = beg + len/2;
    auto handle = std::async(std::launch::async,
                              parallel_sum<RAIter>, mid, end);
    int sum = parallel_sum(beg, mid);
    return sum + handle.get();
}

int main()
{
    std::vector<int> v(10000, 1);
    std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
}
bobah
  • 18,364
  • 2
  • 37
  • 70
  • 1
    is there any documentation on how c++ handles the parallel tasks and asyncs? – Exagon Mar 27 '16 at 12:36
  • 2
    The first parameter to `std::async()` specifies what freedom you give to the framework (primarily whether you allow using the foreground thread). As to what it does for the background - it's compiler specific but most probably on most compilers there will be a singleton thread pool with N=number of CPU cores on the box. Best usage documentation I came cross so far is the concurrency chapter from the last Mayer's book. – bobah Mar 27 '16 at 12:43
  • 2
    Considering that `async` will launch a new thread each time, this solution is not so affective. will you spawn 1000 new threads, each has a stack of 1+ MB of stack ? – David Haim Mar 27 '16 at 12:48
  • @DavidHaim - whether async spawns a thread or uses a background pool is implementation specific (but yes, with GCC it does spawn). No one says about spawning 1000s of threads, obviously (though on a box with 1000s of CPUs - why not), but spawning a few. running several seconds in each and terminating them may be well worth doing. It all depends on the specifics of the problem. – bobah Mar 27 '16 at 13:03
  • I maybe will implement my own threadpool for this problem, instead of uying async because if it realy spawns a new thread for every async call this would be a pain. – Exagon Mar 28 '16 at 11:25
  • @bobah : "*whether async spawns a thread or uses a background pool is implementation specific*" Not really – the standard says that it must behave _as-if_ on a new thread, which basically means it must be on a new thread, otherwise `thread_local` wouldn't work. – ildjarn Apr 11 '16 at 21:29
  • If you are a Windows person, you will probably freak out about creating new threads, because it is excruciatingly slow on Windows. Windows people might not realize that thread creation is shockingly fast on linux, you won't believe how fast it is. That explains part of why C++ standard seems to be so "foolish" about thread creation. – doug65536 Mar 25 '23 at 11:02
  • @doug65536 - it is not fast-fast on Linux either, everything is relative. When you code something of nanoseconds grade you'd be very particular about your threading on Linux too. – bobah Mar 26 '23 at 12:32
  • @bobah It's microseconds. It's the stack allocation, plus basically zero. If you reuse pooled stacks, it basically is zero. – doug65536 Mar 29 '23 at 09:54
5

This can be done using threads specifically pthreads library function that can be used to perform operations concurrently.

You can read more about them here : http://www.tutorialspoint.com/cplusplus/cpp_multithreading.htm

std::thread can also be used : http://www.cplusplus.com/reference/thread/thread/

Below is a code in which i use the thread id of each thread to split the array into two halves :

#include <iostream>
#include <cstdlib>
#include <pthread.h>

using namespace std;

#define NUM_THREADS 2

int arr[10] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};

void *splitLoop(void *threadid)
{
   long tid;
   tid = (long)threadid;
   //cout << "Hello World! Thread ID, " << tid << endl;
   int start = (tid * 5);
   int end = start + 5;
   for(int i = start;i < end;i++){
      cout << arr[i] << " ";
   }
   cout << endl;
   pthread_exit(NULL);
}

int main ()
{
   pthread_t threads[NUM_THREADS];
   int rc;
   int i;
   for( i=0; i < NUM_THREADS; i++ ){
      cout << "main() : creating thread, " << i << endl;
      rc = pthread_create(&threads[i], NULL, 
                          splitLoop, (void *)i);
      if (rc){
         cout << "Error:unable to create thread," << rc << endl;
         exit(-1);
      }
   }
   pthread_exit(NULL);
}

Also remember while compiling you have to use the -lpthread flag.

Link to solution on Ideone : http://ideone.com/KcsW4P

uSeemSurprised
  • 1,826
  • 2
  • 15
  • 18
5

As this thread has been my answer almost every time I've looked for a method to parallelize something, I've decided to add a bit to it, based on the method by arkan (see his answer).

The two following methods are almost the same and allow a simple syntax. Simply include the header file in your project and call one of the parallel version:

Example:

#include "par_for.h"

int main() {
//replace - 
for(unsigned i = 0; i < 10; ++i){
    std::cout << i << std::endl;
}

//with -
//method 1:
pl::thread_par_for(0, 10, [&](unsigned i){
            std::cout << i << std::endl;   //do something here with the index i
        });   //changing the end to },false); will make the loop sequential

//or method 2:
pl::async_par_for(0, 10, [&](unsigned i){
            std::cout << i << std::endl;   //do something here with the index i
        });   //changing the end to },false); will make the loop sequential

return 0;
}

header file - par_for.h:

#include <thread>
#include <vector>
#include <functional>
#include <future>

namespace pl{

    void thread_par_for(unsigned start, unsigned end, std::function<void(unsigned i)> fn, bool par = true){

        //internal loop
        auto int_fn = [&fn](unsigned int_start, unsigned seg_size){
            for (unsigned j = int_start; j < int_start+seg_size; j++){
                fn(j);
            }
        };

        //sequenced for
        if(!par){
            return int_fn(start, end);
        }

        //get number of threads
        unsigned nb_threads_hint = std::thread::hardware_concurrency();
        unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);

        //calculate segments
        unsigned total_length = end - start;
        unsigned seg = total_length/nb_threads;
        unsigned last_seg = seg + total_length%nb_threads;

        //launch threads - parallel for
        auto threads_vec = std::vector<std::thread>();
        threads_vec.reserve(nb_threads);
        for(int k = 0; k < nb_threads-1; ++k){
            unsigned current_start = seg*k;
            threads_vec.emplace_back(std::thread(int_fn, current_start, seg));
        }
        {
            unsigned current_start = seg*(nb_threads-1);
            threads_vec.emplace_back(std::thread(int_fn, current_start, last_seg));
        }
        for (auto& th : threads_vec){
            th.join();
        }
    }




    void async_par_for(unsigned start, unsigned end, std::function<void(unsigned i)> fn, bool par = true){

        //internal loop
        auto int_fn = [&fn](unsigned int_start, unsigned seg_size){
            for (unsigned j = int_start; j < int_start+seg_size; j++){
                fn(j);
            }
        };

        //sequenced for
        if(!par){
            return int_fn(start, end);
        }

        //get number of threads
        unsigned nb_threads_hint = std::thread::hardware_concurrency();
        unsigned nb_threads = nb_threads_hint == 0 ? 8 : (nb_threads_hint);

        //calculate segments
        unsigned total_length = end - start;
        unsigned seg = total_length/nb_threads;
        unsigned last_seg = seg + total_length%nb_threads;

        //launch threads - parallel for
        auto fut_vec = std::vector<std::future<void>>();
        fut_vec.reserve(nb_threads);
        for(int k = 0; k < nb_threads-1; ++k){
            unsigned current_start = seg*k;
            fut_vec.emplace_back(async(int_fn, current_start, seg));
        }
        {
            unsigned current_start = seg*(nb_threads-1);
            fut_vec.emplace_back(std::async(std::launch::async, int_fn, current_start, last_seg));
        }
        for (auto& th : fut_vec){
            th.get();
        }
    }
}

Some simple tests suggest the method with async is faster, probably because the standard library controls whether to actually launch a new thread or not.

Alexis Wilke
  • 19,179
  • 10
  • 84
  • 156
Adam
  • 743
  • 1
  • 6
  • 11
2

The Concurrency::parallel_for (PPL) is also one of the nice opions to do task parallelism.

Taken from C++ Coding Exercise – Parallel For – Monte Carlo PI Calculation

int main() {
    srand(time(NULL)); // seed
    const int N1 = 1000;
    const int N2 = 100000;
    int n = 0;
    int c = 0;
    Concurrency::critical_section cs;
    // it is better that N2 >> N1 for better performance
    Concurrency::parallel_for(0, N1, [&](int i) {
        int t = monte_carlo_count_pi(N2);
        cs.lock(); // race condition
        n += N2;   // total sampling points
        c += t;    // points fall in the circle
        cs.unlock();
    });
    cout < < "pi ~= " << setprecision(9) << (double)c / n * 4.0 << endl;
    return 0;
}
justyy
  • 5,831
  • 4
  • 40
  • 73
  • 3
    PPL is not cross-platform though. https://en.wikipedia.org/wiki/Parallel_Patterns_Library – arkan Mar 07 '20 at 06:12
2

Starting from C++17 std::for_each has overloads that allow parallel execution. In my case, however, my algorithm required specific number of threads for optimal execution, while std::for_each implementation from VS 2022 uses number of threads based on std::thread::hardware_concurrency.

For those who want to be able to control number of parallel workers this simple implementation should behave similarly to std::for_each without requiring to have C++17:

template <class Iter, class Func>
void parallel_for_each(unsigned threadCount, Iter first, Iter last, Func func)
{
    Iter it = first;
    if (it == last)
        return;
    if (++it == last)
    {
        func(*first);
        return;
    }

    if (threadCount == 0)
        threadCount = std::max(2u, std::thread::hardware_concurrency());

    std::mutex mx;
    std::vector<std::thread> threads;
    threads.reserve(threadCount - 1);

    auto func2 = [&]() {
        for (;;)
        {
            Iter it;
            {
                std::lock_guard<std::mutex> lock(mx);
                it = first;
                if (it == last)
                    break;
                ++first;
            }
            func(*it);
        }
    };
    for (unsigned i = 0; i < threadCount - 1; ++i, ++it)
    {
        if (it == last)
            break;
        threads.emplace_back(std::thread(func2));
    }
    func2();
    for (auto& th : threads)
        th.join();
}

template <class Iter, class Func>
void parallel_for_each(Iter first, Iter last, Func func)
{
    parallel_for_each(std::thread::hardware_concurrency(), first, last, func);
}
Pavel P
  • 15,789
  • 11
  • 79
  • 128