Currently, I'm programming a parallel version of k-means++ using OpenMP and C. Until now, I'm implementing the initialization of centroids. If you are not familiar with this procedure, it works roughly as follows. Given a dataset
(matrix) with n
points, k
centroids are initilized using a 'probability function', also known as roulette selection.
Suppose you have n=4
points and the following array of distances to some centroids:
distances = [2, 4, 6, 8]
dist_sum = 20
From these, define an accumulated probability array by dividing each entry of distances
by dist_sum
, and adding the previous results, like this:
probs = [0.1, 0.2, 0.3, 0.4] = [2/20, 4/20, 6/20, 8/20]
acc_probs = [0.1, 0.3, 0.6, 1.0]
Then, perform a roulette selection. Given a random number, say r=0.5
, select the next point using r
and acc_probs
, iterating over acc_probs
until r < acc_probs[i]
. In this example, the selected point is i=2
because r < acc_probs[2]
.
problem
In this case, I'm working with very large matrices (about n=16 000 000
of points). Despite the fact that this program gives the correct answer (i.e. a good initialization of centroids), it does not scale as well as expected. This function computes the initial centroids following this algorithm.
double **parallel_init_centroids (double **dataset, int n, int d, int k, RngStream randomizer, long int *total_ops) {
double dist=0, error=0, dist_sum=0, r=0, partial_sum=0, mindist=0;
int cn=0, cd=0, ck = 0, cck = 0, idx = 0;
ck = 0;
double probs_sum = 0; // debug
int mink=0, id=0, cp=0;
for (ck = 0; ck < k; ck++) {
if ( ck == 0 ) {
// 1. choose an initial centroid c_0 from dataset randomly
idx = RngStream_RandInt (randomizer, 0, n-1);
}
else {
// 2. choose a successive centroid c_{ck} using roulette selection
r = RngStream_RandU01 (randomizer);
idx = 0;
partial_sum = 0;
for (cn=0; cn<n; cn++) {
partial_sum = partial_sum + distances[cn]/dist_sum;
if (r < partial_sum) {
idx = cn;
break;
}
}
}
// 3. copy centroid from dataset
for (cd=0; cd<d; cd++)
centroids[ck][cd] = dataset[idx][cd];
// reset before parallel region
dist_sum = 0;
// -- parallel region --
# pragma omp parallel shared(distances, clusters, centroids, dataset, chunk, dist_sum_threads, total_ops_threads) private(id, cn, cck, cd, cp, error, dist, mindist, mink)
{
id = omp_get_thread_num();
dist_sum_threads[id] = 0; // each thread reset its entry
// parallel loop
// 4. recompute distances against centroids
# pragma omp for schedule(static,chunk)
for (cn=0; cn<n; cn++) {
mindist = DMAX;
mink = 0;
for (cck=0; cck<=ck; cck++) {
dist = 0;
for (cd=0; cd<d; cd++) {
error = dataset[cn][cd] - centroids[ck][cd];
dist = dist + (error * error); total_ops_threads[id]++;
}
if (dist < mindist) {
mindist = dist;
mink = ck;
}
}
distances[cn] = mindist;
clusters[cn] = mink;
dist_sum_threads[id] += mindist; // each thread contributes before reduction
}
}
// -- parallel region --
// 5. sequential reduction
dist_sum = 0;
for (cp=0; cp<p; cp++)
dist_sum += dist_sum_threads[cp];
}
// stats
*(total_ops) = 0;
for (cp=0; cp<p; cp++)
*(total_ops) += total_ops_threads[cp];
// free it later
return centroids;
}
As you can see, the parallel region computes the distance among n
d
-dimensional points against k
d
-dimensional centroids. This work is shared among p
threads (up to 32). After the parallel region is finished, two arrays are filled: distances
and dist_sum_threads
. The first array is the same as the previous example, whereas the second array contains the accumulated distances gathered by each thread. Considering the previous example, if p=2
threads are available, then this array is defined as follows:
dist_sum_threads[0] = 6 ([2, 4]) # filled by thread 0
dist_sum_threads[1] = 14 ([6, 8]) # filled by thread 1
dist_sum
is defined by adding each entry of dist_sum_threads
. This function works as expected, but when the number of threads increases, the execution time grows. This figure shows some performance indicators.
What could be wrong with my implementation, particularly with openmp? In summary, only two pragmas were employed:
# pragma omp parallel ...
{
get thread id
# pragma omp for schedule(static,chunk)
{
compute distances ...
}
fill distances and dist_sum_threads[id]
}
In other words, I removed barriers, mutual exclusion access, and other things that could cause additional overhead. However, the execution time is worst as the number of threads increases.
Update
- The previous code has been changed to be mcve. This snippet is similar to my previous code. In this case, the distance among
n=100000
points andk=16
centroids is computed. - The execution time is measured using
omp_get_wtime
before and after the parallel region. The total time is stored inwtime_spent
. - I include a reduction to compute
dist_sum
. However, it does not work as expected (it is commented as bad parallel reduction below). The correct value ofdist_sum
is999857108020.0
, but, whenp
threads are used to computed it, the result is999857108020.0 * p
, which is wrong. - The performance plot was updated
This is the main parallel function, the complete code is located here:
double **parallel_compute_distances (double **dataset, int n, int d, int k, long int *total_ops) { double dist=0, error=0, mindist=0; int cn, cd, ck, mink, id, cp; // reset before parallel region dist_sum = 0; // -- start time -- wtime_start = omp_get_wtime (); // parallel loop # pragma omp parallel shared(distances, clusters, centroids, dataset, chunk, dist_sum, dist_sum_threads) private(id, cn, ck, cd, cp, error, dist, mindist, mink) { id = omp_get_thread_num(); dist_sum_threads[id] = 0; // reset // 2. recompute distances against centroids # pragma omp for schedule(static,chunk) for (cn=0; cn<n; cn++) { mindist = DMAX; mink = 0; for (ck=0; ck<k; ck++) { dist = 0; for (cd=0; cd<d; cd++) { error = dataset[cn][cd] - centroids[ck][cd]; dist = dist + (error * error); total_ops_threads[id]++; } if (dist < mindist) { mindist = dist; mink = ck; } } distances[cn] = mindist; clusters[cn] = mink; dist_sum_threads[id] += mindist; } // bad parallel reduction //#pragma omp parallel for reduction(+:dist_sum) //for (cp=0; cp<p; cp++){ // dist_sum += dist_sum_threads[cp]; //} } // -- end time -- wtime_end = omp_get_wtime (); // -- total wall time -- wtime_spent = wtime_end - wtime_start; // sequential reduction for (cp=0; cp<p; cp++) dist_sum += dist_sum_threads[cp]; // stats *(total_ops) = 0; for (cp=0; cp<p; cp++) *(total_ops) += total_ops_threads[cp]; return centroids; }