15

Given n partial sums it's possible to sum all the partial sums in log2 parallel steps. For example assume there are eight threads with eight partial sums: s0, s1, s2, s3, s4, s5, s6, s7. This could be reduced in log2(8) = 3 sequential steps like this;

thread0     thread1    thread2    thread4
s0 += s1    s2 += s3   s4 += s5   s6 +=s7
s0 += s2    s4 += s6
s0 += s4

I would like to do this with OpenMP but I don't want to use OpenMP's reduction clause. I have come up with a solution but I think a better solution can be found maybe using OpenMP's task clause.

This is more general than scalar addition. Let me choose a more useful case: an array reduction (see here, here, and here for more about array reductions).

Let's say I want to do an array reduction on an array a. Here is some code which fills private arrays in parallel for each thread.

int bins = 20;
int a[bins];
int **at;  // array of pointers to arrays
for(int i = 0; i<bins; i++) a[i] = 0;
#pragma omp parallel
{
    #pragma omp single   
    at = (int**)malloc(sizeof *at * omp_get_num_threads());        
    at[omp_get_thread_num()] = (int*)malloc(sizeof **at * bins);
    int a_private[bins];
    //arbitrary function to fill the arrays for each thread
    for(int i = 0; i<bins; i++) at[omp_get_thread_num()][i] = i + omp_get_thread_num();
}

At this point I have have an array of pointers to arrays for each thread. Now I want to add all these arrays together and write the final sum to a. Here is the solution I came up with.

#pragma omp parallel
{
    int n = omp_get_num_threads();
    for(int m=1; n>1; m*=2) {
        int c = n%2;
        n/=2;
        #pragma omp for
        for(int i = 0; i<n; i++) {
            int *p1 = at[2*i*m], *p2 = at[2*i*m+m];
            for(int j = 0; j<bins; j++) p1[j] += p2[j];
        }
        n+=c;
    }
    #pragma omp single
    memcpy(a, at[0], sizeof *a*bins);
    free(at[omp_get_thread_num()]);
    #pragma omp single
    free(at);
}

Let me try and explain what this code does. Let's assume there are eight threads. Let's define the += operator to mean to sum over the array. e.g. s0 += s1 is

for(int i=0; i<bins; i++) s0[i] += s1[i]

then this code would do

n   thread0     thread1    thread2    thread4
4   s0 += s1    s2 += s3   s4 += s5   s6 +=s7
2   s0 += s2    s4 += s6
1   s0 += s4

But this code is not ideal as I would like it.

One problem is that there are a few implicit barriers which require all the threads to sync. These barriers should not be necessary. The first barrier is between filling the arrays and doing the reduction. The second barrier is in the #pragma omp for declaration in the reduction. But I can't use the nowait clause with this method to remove the barrier.

Another problem is that there are several threads that don't need to be used. For example with eight threads. The first step in the reduction only needs four threads, the second step two threads, and the last step only one thread. However, this method would involve all eight threads in the reduction. Although, the other threads don't do much anyway and should go right to the barrier and wait so it's probably not much of an issue.

My instinct is that a better method can be found using the omp task clause. Unfortunately I have little experience with the task clause and all my efforts so far with it do a reduction better than what I have now have failed.

Can someone suggest a better solution to do the reduction in logarithmic time using e.g. OpenMP's task clause?


I found a method which solves the barrier problem. This reduces asynchronously. The only remaining problem is that it still puts threads which don't participate in the reduction into a busy loop. This method uses something like a stack to push pointers to the stack (but never pops them) in critical sections (this was one of the keys as critical sections don't have implicit barriers. The stack is operated on serially but the reduction in parallel.

Here is a working example.

#include <stdio.h>
#include <omp.h>
#include <stdlib.h>
#include <string.h>

void foo6() {
    int nthreads = 13;
    omp_set_num_threads(nthreads);
    int bins= 21;
    int a[bins];
    int **at;
    int m = 0;
    int nsums = 0;
    for(int i = 0; i<bins; i++) a[i] = 0;
    #pragma omp parallel
    {
        int n = omp_get_num_threads();
        int ithread = omp_get_thread_num();
        #pragma omp single
        at = (int**)malloc(sizeof *at * n * 2);
        int* a_private = (int*)malloc(sizeof *a_private * bins);

        //arbitrary fill function
        for(int i = 0; i<bins; i++) a_private[i] = i + omp_get_thread_num();

        #pragma omp critical (stack_section)
        at[nsums++] = a_private;

        while(nsums<2*n-2) {
            int *p1, *p2;
            char pop = 0;
            #pragma omp critical (stack_section)
            if((nsums-m)>1) p1 = at[m], p2 = at[m+1], m +=2, pop = 1;
            if(pop) {
                for(int i = 0; i<bins; i++) p1[i] += p2[i];
                #pragma omp critical (stack_section)
                at[nsums++] = p1;
            }
        }

        #pragma omp barrier
        #pragma omp single
        memcpy(a, at[2*n-2], sizeof **at *bins);
        free(a_private);
        #pragma omp single
        free(at);
    }
    for(int i = 0; i<bins; i++) printf("%d ", a[i]); puts("");
    for(int i = 0; i<bins; i++) printf("%d ", (nthreads-1)*nthreads/2 +nthreads*i); puts("");
}

int main(void) {
    foo6();
}

I sill feel a better method may be found using tasks which does not put the threads not being used in a busy loop.

Community
  • 1
  • 1
Z boson
  • 32,619
  • 11
  • 123
  • 226
  • 3
    Why do you not want to use an OpenMP reduction? – Jeff Hammond Feb 27 '16 at 21:34
  • 1
    @Jeff, because `reduction` is a black box. Because I don't know how it works or if even uses a `log(nthreads)` reduction. Because `reduction` does not work when the operations don't commute. Because I think it's useful to know how to do things "by hand". Because I think OpenMP is a good paradigm to teach parallel programming concepts. – Z boson Feb 28 '16 at 19:34
  • 3
    Have you read the specification or any of the OSS runtimes (in GCC and Clang, or Pathscale's)? It's only a black box if you refuse to open the lid. – Jeff Hammond Feb 28 '16 at 19:37
  • @Jeff, fair point. But that does not satisfy my other "becauses". At the very least it won't handle cases where operations don't commute or for compilers which don't support custom reductions which require OpenMP 4.0. – Z boson Feb 28 '16 at 19:42
  • What built-in reduce operators don't commute? And please don't say "-" because the reduce op makes no sense and doesn't do what you think it does anyways. – Jeff Hammond Feb 28 '16 at 19:44
  • Ok sorry you want reductions that aren't supported by OpenMP as built-ins. What are they? – Jeff Hammond Feb 28 '16 at 19:46
  • @Jeff, that's a good point. I suspect that `log(nthreads)` reductions are not useful for the built in operations. I already [asked a question on this](http://stackoverflow.com/questions/21603288/reduction-with-openmp-linear-merging-or-lognumber-of-threads-merging). It took me a while to understand when `log(nthreads)` would be useful and I think I figured it out. It's for cases such as an array reduction (or even more complicated) cases i.e. only for some custom reduction. But I would not be surprised if OpenMP implementations just reduce in O(nthreads) time. – Z boson Feb 28 '16 at 19:53
  • @Jeff, BTW, matrix multiplication is an example of an operation that does not commute but can still be done in parallel. You can read more about doing a reduction with OpenMP when operations don't commute [here](http://stackoverflow.com/a/35479939/2542702). – Z boson Feb 28 '16 at 19:58
  • 2
    OpenMP should implement the fastest reduction known to the implementers. I expect many are log(N). Whether or not you can see this in measurements depends on how you construct them. Many experiments will be dominated by memory cost or runtime overheads if you don't amortize out parallel region costs. – Jeff Hammond Feb 28 '16 at 20:02
  • @Jeff, do you have a source for that? I would like to read about that. I agree that looking at the source code is the best way to go though. I guess I can find it online somwhere? Do you know the particular source file which does this in GCC? In any case I still want to know how to do this myself with OpenMP. – Z boson Feb 28 '16 at 20:06
  • Google for GOMP or LLVM OpenMP runtime. The latter has its own website. Git clone and Git grep will get you there. – Jeff Hammond Feb 28 '16 at 20:07
  • @Jeff, just to be clear, I asked this question independent of how OpenMP implements the `reduction` directive. It does not matter to me if it does in in `log(nthreads)` operations. – Z boson Mar 01 '16 at 18:56
  • I'm... confused. Your code has no timing within it. Can you define "better" in your question above? Does it mean "takes less time"? Because I do not agree that logarithmic time == better. Due to issues like false sharing, synchronization and coherency overhead, I would imagine you could do something like 20-50 accumulations in a single-threaded loop in the same time it would take you to perform two stages of two-way reduction. For that reason the collapse factor per stage should be much higher than 2 - maybe 10, 20 or more. And, well, most machines don't have such obscene numbers of CPUs, – Iwillnotexist Idonotexist Mar 01 '16 at 22:52
  • @IwillnotexistIdonotexist, it's a two stage process. An example. You want to fill a historgram with `N` bins in parallel. Let's say you have `t` cores. Then create `t` empty historgrams (one for each thread). Let's assume you have `n` items to fill. You fill each private histogram in parallel and it takes `A*n/t` seconds (where A is some constant). That's the first stage. In the second stage you need to add each private histogram. You can do this in `t` operations or `log(t)` operations. – Z boson Mar 02 '16 at 09:34
  • 2
    @IwillnotexistIdonotexist, normally `n >> N` so it does not really matter how you do the second stage because the time is completely dominated by the first stage. But what if `n ≈ N`? In this case the second stage will not be insignificant. I admit that I should have come up with a example to show this (I mean with timing) but everyone on SO for OpenMP says to use the `reduction` clause because it may do the second stage in `log(t)` operations. And so I think this might be an example where it is. – Z boson Mar 02 '16 at 09:39
  • @IwillnotexistIdonotexist I am basically trying to come up with a better method for the second stage of [this question](https://stackoverflow.com/questions/16789242/fill-histograms-array-reduction-in-parallel-with-openmp-without-using-a-critic) but I admit that for just reducing to a scaler value that `log(t)` is probably not any better ([I suspect it's even worse but not enough to notice](https://stackoverflow.com/questions/21603288/reduction-with-openmp-linear-merging-or-lognumber-of-threads-merging)). – Z boson Mar 02 '16 at 09:42

1 Answers1

11

Actually, it is quite simple to implement that cleanly with tasks using a recursive divide-and-conquer approach. This is almost textbook code.

void operation(int* p1, int* p2, size_t bins)
{
    for (int i = 0; i < bins; i++)
        p1[i] += p2[i];
}

void reduce(int** arrs, size_t bins, int begin, int end)
{
    assert(begin < end);
    if (end - begin == 1) {
        return;
    }
    int pivot = (begin + end) / 2;
    /* Moving the termination condition here will avoid very short tasks,
     * but make the code less nice. */
#pragma omp task
    reduce(arrs, bins, begin, pivot);
#pragma omp task
    reduce(arrs, bins, pivot, end);
#pragma omp taskwait
    /* now begin and pivot contain the partial sums. */
    operation(arrs[begin], arrs[pivot], bins);
}

/* call this within a parallel region */
#pragma omp single
reduce(at, bins, 0, n);

As far as i can tell, there are no unnecessary synchronizations and there is no weird polling on critical sections. It also works naturally with a data size different than your number of ranks. I find it very clean and easy to understand. So I do indeed think this is better than both of your solutions.

But let's look at how it performs in practice*. For that we can use Score-p and Vampir:

*bins=10000 so the reduction actually takes a little bit of time. Executed on a 24-core Haswell system w/o turbo. gcc 4.8.4, -O3. I added some buffer around the actual execution to hide initialization/post-processing

execution of the three variants

The picture reveals what is happening at any thread within the application on a horizontal time-axis. The tree implementations from top to bottom:

  1. omp for loop
  2. omp critical kind of tasking.
  3. omp task

This shows nicely how the specific implementations actually execute. Now it seems that the for loop is actually the fastest, despite the unnecessary synchronizations. But there are still a number of flaws in this performance analysis. For example, I didn't pin the threads. In practice NUMA (non-uniform memory access) matters a lot: Does the core does have this data in it's own cache / memory of it's own socket? This is where the task solution becomes non-deterministic. The very significant variance among repetitions is not considered in the simple comparison.

If the reduction operation becomes variable in runtime, then the task solution will become better than thy synchronized for loop.

The critical solution has some interesting aspect, the passive threads are not continuously waiting, so they will more likely consume CPU resources. This can be bad for performance e.g. in case of turbo mode.

Remember that the task solution has more optimization potential by avoiding spawning tasks that immediately return. How these solutions perform also highly depends on the specific OpenMP runtime. Intel's runtime seems to do much worse for tasks.

My recommendation is:

  • Implement the most maintainable solution with optimal algorithmic complexity
  • Measure which parts of the code actually matter for run-time
  • Analyze based on actual measurements what is the bottleneck. In my experience it is more about NUMA and scheduling rather than some unnecessary barrier.
  • Perform the micro-optimization based on your actual measurements

Linear solution

Here is the timeline for the linear proccess_data_v1 from this question.

parallel timeline

OpenMP 4 Reduction

So I thought about OpenMP reduction. The tricky part seems to be getting the data from the at array inside the loop without a copy. I do initialize the worker array with NULL and simply move the pointer the first time:

void meta_op(int** pp1, int* p2, size_t bins)
{
    if (*pp1 == NULL) {
        *pp1 = p2;
        return;
    }
    operation(*pp1, p2, bins);
}

// ...

// declare before parallel region as global
int* awork = NULL;

#pragma omp declare reduction(merge : int* : meta_op(&omp_out, omp_in, 100000)) initializer (omp_priv=NULL)

#pragma omp for reduction(merge : awork)
        for (int t = 0; t < n; t++) {
            meta_op(&awork, at[t], bins);
        }

Surprisingly, this doesn't look too good:

timeline for omp4 reduction

top is icc 16.0.2, bottom is gcc 5.3.0, both with -O3.

Both seem to implement the reduction serialized. I tried to look into gcc / libgomp, but it's not immediately apparent to me what is happening. From intermediate code / disassembly, they seem to be wrapping the final merge in a GOMP_atomic_start/end - and that seems to be a global mutex. Similarly icc wraps the call to the operation in a kmpc_critical. I suppose there wasn't much optimization going into costly custom reduction operations. A traditional reduction can be done with a hardware-supported atomic operation.

Notice how each operation is faster because the input is cached locally, but due to the serialization it is overall slower. Again this is not a perfect comparison due to high variances, and earlier screenshots were with different gcc version. But the trend is clear, and I also have data on the cache effects.

Community
  • 1
  • 1
Zulan
  • 21,896
  • 6
  • 49
  • 109
  • I tested your code. It works! This is exactly the kind of answer I was looking for. Thanks! The fact that it is a textbook example makes it even better. I am happy to see you were able to distil the essence of my question despite some ambiguity. The image is awesome! It really shows visually what I was trying to say in words. – Z boson Mar 05 '16 at 14:17
  • Note that your method using tasks still requires a barrier between the first and second stage whereas my method with critical sections (my second method) does not. – Z boson Mar 05 '16 at 14:18
  • In case you are interested, I originally used a more optimized version of my first method. Instead of using `#pragma omp for` I used `if(omp_get_thread_num() == 2*i*m) { suma[2*i*m] += suma[2*i*m+m]; }`. This insures that the array at iteration `2*i*m` is used by thread `2*j*m` so it should be more cache friendly. – Z boson Mar 05 '16 at 14:22
  • It would be interesting to see this graph using the `O(n)` method with critical sections. I mean the method defined in `proccess_data_v1` in [this](http://stackoverflow.com/q/16789242/2542702) question. It should show that only one thread at a time does the reduction and I would expect it to be the slowest of the methods. – Z boson Mar 05 '16 at 14:28
  • 1
    @Zboson, with the current implementation, the barrier is required. You could however run the "fill function" as a task at the termination condition of the recursion. Then the reduction can start independently. – Zulan Mar 05 '16 at 14:59
  • It would also be interesting to see the graph using OpenMP's reduction clause using `omp declare reduction` for the array reduction. Maybe I'll make my own plot at some point. – Z boson Mar 05 '16 at 15:08
  • 1
    @Zboson, i added a trace from `process_data_v1` confirming the assumption. – Zulan Mar 05 '16 at 15:17
  • 1
    @Zboson I tried the OpenMP4 `omp declare reduction`, edited the answer. I am quite surprised by the result. – Zulan Mar 06 '16 at 22:36
  • Woah! Awesome that you checked `reduction`. Thank you so much! Everyone (I have asked) says OpenMP implementations may use `log(threads)` but I have never seen any proof of this and now we see it is linear (at least in this case on the CPU with ICC and GCC) just like I predicted. To be clear I did not ask this question to challenge the `reduction` clause. I asked this question mostly for education about algorithms for doing reductions and to learn more about `task`. – Z boson Mar 07 '16 at 07:52