1

I'm trying to make a for loop multi-threaded in C++ so that the calculation gets divided to the multiple threads. Yet it contains data that needs to be joined together in the order as they are.

So the idea is to first join the small bits on many cores (25.000+ loops) and then join the combined data once more at the end.

std::vector<int> ids;               // mappings
std::map<int, myData> combineData;  // data per id
myData outputData;                  // combined data based on the mappings
myData threadData;                  // data per thread

    #pragma parallel for default(none) private(data, threadData) shared(combineData)
        for (int i=0; i<30000; i++)
        {
            threadData += combineData[ids[i]];
        }

    // Then here I would like to get all the seperate thread data and combine them in a similar manner
    // I.e.: for each threadData:  outputData += threadData

What would be the efficient and good way to approach this?

How can I schedule the openmp loop so that the scheduling is split evenly into chunks

For example for 2 threads: [0, 1, 2, 3, 4, .., 14999] & [15000, 15001, 15002, 15003, 15004, .., 29999]

If there's a better way to join the data (which involves joining a lot of std::vectors together and some matrix math), yet preserve the order of additions pointers to that would help as well.

Added information

  • The addition is associative, though not commutative.
  • myData is not an intrinsic type. It's a class containing data as multiple std::vectors (and some data related to the Autodesk Maya API.)
  • Each cycle is doing a similar matrix multiplication to many points and adds these points to a vector (in theory the calculation time should stay roughly similar per cycle)

Basically it's adding mesh data (consisting of vectors of data) to eachother (combining meshes) though the order of the whole thing accounts for the index value of the vertices. The vertex index should be consistent and rebuildable.

Roy Nieterau
  • 1,226
  • 2
  • 15
  • 26
  • Is your operator on `myData` associative i.e. does `(A + B) + C` = A + (B + C)`? That's going to effective whether or not you can parallelize this. See my answer. – Z boson Sep 11 '13 at 20:47
  • Yes it is associative, though not commutative. That's why I need the joining of the thread data at the end in the correct order. – Roy Nieterau Sep 11 '13 at 23:57
  • I read the added information. If you are allowed to store index ID along with the data you aggregate, then you can do the operation in any order. If you need an array sorted by index value, you can always sort it afterwards (even in parallel) using a custom comparison operation based on the index ID. – Massimiliano Sep 12 '13 at 07:50

2 Answers2

2

This depends on a few properties of the the addition operator of myData. If the operator is both associative (A + B) + C = A + (B + C) as well as commutative A + B = B + A then you can use a critical section or if the data is plain old data (e.g. a float, int,...) a reduction.

However, if it's not commutative as you say (order of operation matters) but still associative you can fill an array with a number of elements equal to the number of threads of the combined data in parallel and then merge them in order in serial (see the code below. Using schedule(static) will split the chunks more or less evenly and with increasing thread number as you want.

If the operator is neither associative nor commutative then I don't think you can parallelize it (efficiently - e.g. try parallelizing a Fibonacci series efficiently).

std::vector<int> ids;               // mappings
std::map<int, myData> combineData;  // data per id
myData outputData;                  // combined data based on the mappings
myData *threadData;
int nthreads;
#pragma omp parallel
{
    #pragma omp single
    {
        nthreads = omp_get_num_threads();
        threadData = new myData[nthreads];
    }
    myData tmp;
    #pragma omp for schedule(static)
    for (int i=0; i<30000; i++) {
        tmp += combineData[ids[i]];
    }
    threadData[omp_get_thread_num()] = tmp;
}
for(int i=0; i<nthreads; i++) {
     outputData += threadData[i];
}
delete[] threadData;

Edit: I'm not 100% sure at this point if the chunks will assigned in order of increasing thread number with #pragma omp for schedule(static) (though I would be surprised if they are not). There is an ongoing discussion on this issue. Meanwhile, if you want to be 100% sure then instead of

#pragma omp for schedule(static)
for (int i=0; i<30000; i++) {
    tmp += combineData[ids[i]];
}

you can do

const int nthreads = omp_get_num_threads();
const int ithread = omp_get_thread_num();
const int start = ithread*30000/nthreads;
const int finish = (ithread+1)*30000/nthreads;
for(int i = start; i<finish; i++) {
     tmp += combineData[ids[i]];          
}

Edit:

I found a more elegant way to fill in parallel but merge in order

#pragma omp parallel
{
    myData tmp;
    #pragma omp for schedule(static) nowait 
    for (int i=0; i<30000; i++) {
        tmp += combineData[ids[i]];
    }
    #pragma omp for schedule(static) ordered 
    for(int i=0; i<omp_get_num_threads(); i++) {
        #pragma omp ordered
        outputData += tmp;
    }
}

This avoids allocating data for each thread (threadData) and merging outside the parallel region.

Community
  • 1
  • 1
Z boson
  • 32,619
  • 11
  • 123
  • 226
  • Looking at this page http://msdn.microsoft.com/en-us/library/x5aw0hdf.aspx it looks like schedule(static) doesn't divide it into ordered chunks where the chunk size is equal to numIterations/numThreads. (They are ordered yes, yet the chunk is of size 1 so each thread isn't fully behind the previous thread. Right? – Roy Nieterau Sep 11 '13 at 19:45
  • 1
    That example only shows the result when the chuck size is set. If the chunk size is not specified as in `schedule(static)` then it will divide them more or less equally. Note that the order that the threads run is more or less random but the way they are assigned in order in increasing thread number. They are two different things. – Z boson Sep 11 '13 at 19:59
  • @Massimiliano, well if I read Hristo Iliev's comment correctly [here](http://stackoverflow.com/questions/18719257/parallel-cumulative-prefix-sums-in-openmp-communicating-values-between-thread) it is guaranteed. But that's exactly why I asked [this](http://stackoverflow.com/questions/18746282/openmp-schedulestatic-with-no-chunk-size-specified-chunk-size-and-order-of-as/18750475#18750475) question. I don't know for sure myself. – Z boson Sep 11 '13 at 20:41
  • @RoyNieterau, since the operation is still associative I added some comments on how you can guarantee that each chunk is evenly distributed as well as being in order of increasing thread number. – Z boson Sep 12 '13 at 06:51
1

If you really want to preserve the same order as in the serial case, then there is no other way than doing it serially. In that case you can maybe try to parallelize the operations done in operator+=.

If the operations can be done randomly, but the reduction of the blocks has a specific order , then it may be worth having a look at TBB parallel_reduce. It will require you to write more code, but if I remember well you can define complex custom reduction operations.

If the order of the operations doesn't matter, then your snippet is almost complete. What it lacks is possibly a critical construct to aggregate private data:

std::vector<int> ids;               // mappings
std::map<int, myData> combineData;  // data per id
myData outputData;                  // combined data based on the mappings

#pragma omp parallel
{ 
    myData threadData;              // data per thread

    #pragma omp for nowait
    for (int ii =0; ii < total_iterations; ii++)
    {
        threadData += combineData[ids[ii]];
    }
    #pragma omp critical
    {
        outputData += threadData;
    }    
    #pragma omp barrier
    // From here on you are ensured that every thread sees 
    // the correct value of outputData 
 }

The schedule of the for loop in this case is not important for the semantic. If the overload of operator+= is a relatively stable operation (in terms of the time needed to compute it), then you can use schedule(static) which divides the iterations evenly among threads. Otherwise you can resort to other scheduling to balance the computational burden (e.g. schedule(guided)).

Finally if myData is a typedef of an intrinsic type, then you can avoid the critical section and use a reduction clause:

    #pragma omp for reduction(+:outputData)
    for (int ii =0; ii < total_iterations; ii++)
    {
        outputData += combineData[ids[ii]];
    }

In this case you don't need to declare anything explicitly as private.

Massimiliano
  • 7,842
  • 2
  • 47
  • 62
  • 1
    I'm not sure I agree that it can only be done serially. Consider matrix multiplication. It's not commutative. Let's say you have four matrices and you want the product ABCD you could have thread one do AB and thread two CD then take the product (AB)(CD) serially. That will still give the correct answer. See my answer. – Z boson Sep 11 '13 at 20:09
  • Another way to say this is that if the operation is still associative (AB)C = A(BC) then you can still operate on sub regions in parallel and then operate serially on the result of each sub region. – Z boson Sep 11 '13 at 20:18
  • @redrum Another way to say that the operation is still associative is to say that the order can be broken in a particular way. The serial order `(((A+B)+C) +D)` is not the same as `(A+B) + (C+D)`. – Massimiliano Sep 11 '13 at 20:27
  • @Massimilano, maybe were saying the same think now? `(((A+B)+C)+D)` should give the same result as `(A+B) + (B+D)` if the operation is associative. That's the point. – Z boson Sep 11 '13 at 20:32
  • @redrum Strictly speaking `preserve the order of additions` (OP question) doesn't permit an operation to be associative. – Massimiliano Sep 11 '13 at 20:36
  • @Massimilano, the only what to know is to ask the OP. – Z boson Sep 11 '13 at 20:46
  • @redrum In this case the addition is associative, though not commutative. – Roy Nieterau Sep 12 '13 at 05:35
  • Can't reduction be done on user defined classes? Why is is it restricted to intrinsic? – Cramer Sep 12 '13 at 07:26
  • 2
    @Cramer In the standard 3.1 there is this restriction `Aggregate types (including arrays), pointer types and reference types may not appear in a reduction clause`, so I assume that most user-defined classes won't be allowed. As soon as standard 4.0 will be implemented the following change will be instead available `The reduction clause (see Section 2.14.3.6 on page 167) was extended and the declare reduction construct (see Section 2.15 on page 180) was added to support user defined reductions` – Massimiliano Sep 12 '13 at 07:43
  • Well that's dumb. I suppose that's what you get for targeting C and assuming it's all good for C++ as well. – Cramer Sep 12 '13 at 12:15