3

Currently, I'm programming a parallel version of k-means++ using OpenMP and C. Until now, I'm implementing the initialization of centroids. If you are not familiar with this procedure, it works roughly as follows. Given a dataset (matrix) with n points, k centroids are initilized using a 'probability function', also known as roulette selection.

Suppose you have n=4 points and the following array of distances to some centroids:

distances = [2, 4, 6, 8]
dist_sum  = 20

From these, define an accumulated probability array by dividing each entry of distances by dist_sum, and adding the previous results, like this:

probs     = [0.1, 0.2, 0.3, 0.4] = [2/20, 4/20, 6/20, 8/20]
acc_probs = [0.1, 0.3, 0.6, 1.0]

Then, perform a roulette selection. Given a random number, say r=0.5, select the next point using r and acc_probs, iterating over acc_probs until r < acc_probs[i]. In this example, the selected point is i=2 because r < acc_probs[2].

problem In this case, I'm working with very large matrices (about n=16 000 000 of points). Despite the fact that this program gives the correct answer (i.e. a good initialization of centroids), it does not scale as well as expected. This function computes the initial centroids following this algorithm.

    double **parallel_init_centroids (double **dataset, int n, int d, int k, RngStream randomizer, long int *total_ops) {

    double dist=0, error=0, dist_sum=0, r=0, partial_sum=0, mindist=0;
    int cn=0, cd=0, ck = 0, cck = 0, idx = 0;
    ck = 0;

    double probs_sum = 0; // debug

    int  mink=0, id=0, cp=0;

    for (ck = 0; ck < k; ck++) {

        if ( ck == 0 ) {

            // 1. choose an initial centroid c_0 from dataset randomly
            idx = RngStream_RandInt (randomizer, 0, n-1);

        }  
        else {

            // 2. choose a successive centroid c_{ck} using roulette selection
            r = RngStream_RandU01 (randomizer);
            idx = 0;
            partial_sum = 0;
            for (cn=0; cn<n; cn++) {

                partial_sum = partial_sum + distances[cn]/dist_sum;

                if (r < partial_sum) {
                    idx = cn;
                    break;
                }
            }
        }

        // 3. copy centroid from dataset
        for (cd=0; cd<d; cd++)
            centroids[ck][cd] = dataset[idx][cd];

        // reset before parallel region
        dist_sum = 0;

        // -- parallel region --
        # pragma omp parallel shared(distances, clusters, centroids, dataset, chunk, dist_sum_threads, total_ops_threads) private(id, cn, cck, cd, cp, error, dist, mindist, mink)
        {
            id = omp_get_thread_num();
            dist_sum_threads[id] = 0;               // each thread reset its entry

            // parallel loop
            // 4. recompute distances against centroids
            # pragma omp for schedule(static,chunk)
            for (cn=0; cn<n; cn++) {

                mindist = DMAX;
                mink = 0;

                for (cck=0; cck<=ck; cck++) {

                    dist = 0;

                    for (cd=0; cd<d; cd++) {

                        error = dataset[cn][cd] - centroids[ck][cd];
                        dist = dist + (error * error);                  total_ops_threads[id]++;
                    }

                    if (dist < mindist) {
                        mindist = dist;
                        mink = ck;
                    }
                }

                distances[cn]           = mindist;
                clusters[cn]            = mink;
                dist_sum_threads[id]    += mindist;      // each thread contributes before reduction
            }
        }
        // -- parallel region --

        // 5. sequential reduction
        dist_sum = 0;
        for (cp=0; cp<p; cp++)
            dist_sum += dist_sum_threads[cp];
    }


    // stats
    *(total_ops) = 0;
    for (cp=0; cp<p; cp++)
        *(total_ops) += total_ops_threads[cp];

    // free it later
    return centroids;
}

As you can see, the parallel region computes the distance among n d-dimensional points against k d-dimensional centroids. This work is shared among p threads (up to 32). After the parallel region is finished, two arrays are filled: distances and dist_sum_threads. The first array is the same as the previous example, whereas the second array contains the accumulated distances gathered by each thread. Considering the previous example, if p=2 threads are available, then this array is defined as follows:

dist_sum_threads[0] = 6  ([2, 4])  # filled by thread 0
dist_sum_threads[1] = 14 ([6, 8])  # filled by thread 1

dist_sum is defined by adding each entry of dist_sum_threads. This function works as expected, but when the number of threads increases, the execution time grows. This figure shows some performance indicators.

What could be wrong with my implementation, particularly with openmp? In summary, only two pragmas were employed:

# pragma omp parallel ...
{
    get thread id

    # pragma omp for schedule(static,chunk)
    {
        compute distances ...
    }

    fill distances and dist_sum_threads[id]
}

In other words, I removed barriers, mutual exclusion access, and other things that could cause additional overhead. However, the execution time is worst as the number of threads increases.

Update

  • The previous code has been changed to be mcve. This snippet is similar to my previous code. In this case, the distance among n=100000 points and k=16 centroids is computed.
  • The execution time is measured using omp_get_wtime before and after the parallel region. The total time is stored in wtime_spent.
  • I include a reduction to compute dist_sum. However, it does not work as expected (it is commented as bad parallel reduction below). The correct value of dist_sum is 999857108020.0, but, when p threads are used to computed it, the result is 999857108020.0 * p, which is wrong.
  • The performance plot was updated
  • This is the main parallel function, the complete code is located here:

    double **parallel_compute_distances (double **dataset, int n, int d, int k, long int *total_ops) {
    
        double dist=0, error=0, mindist=0;
    
        int cn, cd, ck, mink, id, cp;
    
        // reset before parallel region
        dist_sum = 0;            
    
        // -- start time --
        wtime_start = omp_get_wtime ();
    
        // parallel loop
        # pragma omp parallel shared(distances, clusters, centroids, dataset, chunk, dist_sum, dist_sum_threads) private(id, cn, ck, cd, cp, error, dist, mindist, mink)
        {
            id = omp_get_thread_num();
            dist_sum_threads[id] = 0;               // reset
    
            // 2. recompute distances against centroids
            # pragma omp for schedule(static,chunk)
            for (cn=0; cn<n; cn++) {
    
                mindist = DMAX;
                mink = 0;
    
                for (ck=0; ck<k; ck++) {
    
                    dist = 0;
    
                    for (cd=0; cd<d; cd++) {
    
                        error = dataset[cn][cd] - centroids[ck][cd];
                        dist = dist + (error * error);                               total_ops_threads[id]++;
                    }
    
                    if (dist < mindist) {
                        mindist = dist;
                        mink = ck;
                    }
                }
    
                distances[cn]           = mindist;
                clusters[cn]            = mink;
                dist_sum_threads[id]    += mindist;
            }
    
    
            // bad parallel reduction
            //#pragma omp parallel for reduction(+:dist_sum)
            //for (cp=0; cp<p; cp++){             
            //    dist_sum += dist_sum_threads[cp];
            //}
    
        }
    
    
        // -- end time --
        wtime_end = omp_get_wtime ();
    
        // -- total wall time --
        wtime_spent = wtime_end - wtime_start;
    
        // sequential reduction
        for (cp=0; cp<p; cp++)       
            dist_sum += dist_sum_threads[cp];
    
    
        // stats
        *(total_ops) = 0;
        for (cp=0; cp<p; cp++)
            *(total_ops) += total_ops_threads[cp];
    
        return centroids;
    }
    
Community
  • 1
  • 1
auraham
  • 1,679
  • 2
  • 20
  • 27

1 Answers1

2

Your code not being a mcve I can only emit hypothesis here. However, here is what I think (might) happen (in no specific order of importance):

  • You suffer from false sharing when you update dist_sum_threads and total_ops_threads. You can avoid the former altogether by simply declaring reduction( +: dist_sum ) and using directly dist_sum inside the parallel region. You can do likewise for total_ops_threads using a local total_ops declared reduction(+) as well, and which you accumulate into *total_ops at the end. (BTW, dist_sum is computed but never used...)

  • The code looks memory bound anyway since you have plenty of memory accesses for almost no computations. Therefore, expected speed-ups will mostly depend on your memory bandwidth and the number of memory controller you can access while parallelising the code. See this epic answer for more details.

  • In light of the aforementioned likely memory-bound character of your problem, try to play with memory placement (numactl possibly and/or thread affinity with proc_bind). You can also try to play with the thread scheduling policy and/or try to see if some loop tiling couldn't be applied to your problem for blocking data into cache.

  • You didn't detail the way you measured your times, but be aware that the speed-ups only make sense in the context of wall-clock time, not CPU time. Please use omp_get_wtime() for any such measurements.

Try to address these points and to evaluate your actual potential speed-up depending on you memory architecture. If you still feel that you don't achieve what you should, then just update your question.


EDIT:

Since you provided a full example, I managed to experiment a bit with your code and to implement the modifications I had in mind (to reduce the false sharing mostly).

Here is what the function no looks like:

double **parallel_compute_distances( double **dataset, int n, int d,
                                     int k, long int *total_ops ) {
    // reset before parallel region
    dist_sum = 0;            

    // -- start time --
    wtime_start = omp_get_wtime ();

    long int tot_ops = 0;

    // parallel loop
    # pragma omp parallel for reduction( +: dist_sum, tot_ops )
    for ( int cn = 0; cn < n; cn++ ) {
        double mindist = DMAX;
        int mink = 0;
        for ( int ck = 0; ck < k; ck++ ) {
            double dist = 0;
            for ( int cd = 0; cd < d; cd++ ) {
                double error = dataset[cn][cd] - centroids[ck][cd];
                dist += error * error;
                tot_ops++;
            }
            if ( dist < mindist ) {
                mindist = dist;
                mink = ck;
            }
        }
        distances[cn] = mindist;
        clusters[cn] = mink;
        dist_sum += mindist;
    }

    // -- end time --
    wtime_end = omp_get_wtime ();

    // -- total wall time --
    wtime_spent = wtime_end - wtime_start;

    // stats
    *(total_ops) = tot_ops;

    return centroids;
}

So, a few comments:

  • As explained before, dist_sum and a local variable for the total number of operations (tot_ops) are now declared reduction(+:). This avoid accessing the same array with one thread per index, which triggers false sharing (this is almost the perfect case for triggering it). I used a local variable instead of total_ops as the later being a pointer, it cannot be used in the reduction clause directly. However, updating it at the end with tot_ops does the job.

  • I delayed all the variable declarations as much as possible. This is good practice because it spares most of the private declarations, which are usually the main pitfall for OpenMP programmers. Now you only have to think about the two reduction variables, and the two arrays, which are obviously shared and therefore do not need any extra declaration. This simplifies a lot the parallel directive and helps focussing on what matters

  • Now that the thread id isn't needed any more, the parallel and for directives can be merged for better readability (and possibly performance too).

  • I removed the schedule clause to let the compiler and/or run time library use their default. I would only go for a different scheduling policy if I have a good reason to.

With that, on my dual-core laptop sing GCC 5.3.0 and compiling with -std=c99 -O3 -fopenmp -mtune=native -march=native, I get consistent results for various number of threads and a speed-up of 2x for 2 threads.

On a 10-core machine, using Intel compiler and -std=c99 -O3 -xhost -qopenmp, I get a linear speed-up from 1 to 10 threads...

Even on a Xeon Phi KNC do I get near-linear speed-up from 1 to 60 threads (then using more hardware threads still gives some speed-ups, but not to the same extend).

The speed-ups observed made me realised that unlike what I assumed, the code is not memory bound, since the arrays you access are actually very well cached. The reason for that is that you only access dataset[cn][cd] and centroids[ck][cd] which 2nd dimensions are very small (40 and 16) so fit nicely in cache, while the chunk of dataset to load for next cn index can be prefetched effectively.

Do you still experience scalability issues with this version of the code?

Community
  • 1
  • 1
Gilles
  • 9,269
  • 4
  • 34
  • 53
  • "Even on a Xeon Phi KNC I get near-linear speed-up from 1 to 60 threads (then using more hardware threads still gives some speed-ups, but not to the same extent)." The KNC has 60 cores but can run 4 threads/core. Therefore (assuming you're using scatter affinity), the first 60 threads will each have a whole core, but above 60 threads you'll have multiple HW threads on the same core. You therefore shouldn't expect the same performance gain. It's best to plot scaling here with cores on x and separate lines for 1T/C, 2T/C, 4T/C. (The KMP_PLACE_THREADS envirable can help) – Jim Cownie Mar 03 '16 at 10:23