I'm trying to parallelize a merge-sort algorithm. What I'm doing is dividing the input array for each thread, then merging the threads results. The way I'm trying to merge the results is something like this:
thread 0 | thread 1 | thread 2 | thread 3
sort(A0) | sort(A1) | sort(A2) | sort(A3)
merge(A0,A1) | | merge(A2,A3) |
merge(A0A1, A2A3) | | |
So, at the end of my function sortManager
I call the function mergeThreadResults
that should implement the above logic. In it I iterate over pairs to merge the corresponding threads. Then, if needed, I merge the last items onto thread 0. It looks like this :
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread;
int iter = 2;
while (iter <= threads) {
int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %d\n", myRank, nextThread);
if (myRank % iter != 0) {
break;
}
merge(sortingArray, myLeft, myRight, nextThreadRight);
sleep(3); // <- sleep
myRight = nextThreadRight;
iter = iter * 2;
}
if (myRank == 0 && nextThread < threads-1) {
int nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
It appears to be working as intended. The problem is, I'm using a sleep
function to synchronize the threads, which is far from being the best approach. So I'm trying to implement a barrier with pthread.
In it I try to calculate how many iterations will be needed on that cycle and pass it as breakpoint
. When all the threads are at the same point I release the merge function and wait again in the new cycle. This is what I've tried:
pthread_mutex_lock(&mutex);
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(&cond_var);
} else {
while (pthread_cond_wait(&cond_var, &mutex) != 0);
}
pthread_mutex_unlock(&mutex);
But it's not working as intended. Some merge
triggers before the last cycle has fully ended, leaving me with a partially sorted array.
This is a minor example of my code for testing:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include <unistd.h>
// Initialize global variables
int sortingArray[20] = {5,-4,3,-1,-2,3,1,2,-2,-1,-2,-1,-2,-3,4,1234,534,123,87,123};
int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;
struct ThreadTask {
long rank;
int size;
int threads;
};
void merge(int arr[], int left, int mid, int right) {
/* Merge arrays */
int i, j, k;
int n1 = mid - left + 1;
int n2 = right - mid;
// Alocate temp arrays
int *L = malloc((n1 + 2) * sizeof(int));
int *R = malloc((n2 + 2) * sizeof(int));
if (L == NULL || R == NULL) {
fprintf(stderr, "Fatal: failed to allocate memory fo temp arrays.");
exit(EXIT_FAILURE);
}
// Populate temp arrays
for (i = 1; i <= n1; i++) {
L[i] = arr[left + i - 1];
}
for (j = 1; j <= n2; j++) {
R[j] = arr[mid + j];
}
L[n1 + 1] = INT_MAX;
R[n2 + 1] = INT_MAX;
i = 1;
j = 1;
// Merge arrays
for (k = left; k <= right; k++) {
if (L[i] <= R[j]) {
arr[k] = L[i];
i++;
} else {
arr[k] = R[j];
j++;
}
}
free(L);
free(R);
}
void mergeSort(int arr[], int left, int right) {
/* Sort and then merge arrays */
if (left < right) {
int mid = left + (right - left) / 2;
mergeSort(arr, left, mid);
mergeSort(arr, mid + 1, right);
merge(arr, left, mid, right);
}
}
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread;
int iter = 2;
while (iter <= threads) {
int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %d\n", myRank, nextThread);
if (myRank % iter != 0) {
break;
}
// barrier
pthread_mutex_lock(&mutex);
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(&cond_var);
} else {
while (pthread_cond_wait(&cond_var, &mutex) != 0);
}
pthread_mutex_unlock(&mutex);
merge(sortingArray, myLeft, myRight, nextThreadRight);
sleep(2); // <- sleep
myRight = nextThreadRight;
iter = iter * 2;
}
if (myRank == 0 && nextThread < threads-1) {
int nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
void *sortManager(void *threadInfo) {
/* Manage mergeSort between threads */
struct ThreadTask *currentTask = threadInfo;
// Get task arguments
long rank = currentTask->rank;
int left= rank * ((float)currentTask->size / (float)currentTask->threads);
int right = (rank + 1) * ((float)currentTask->size / (float)currentTask->threads) - 1;
int mid = left + (right - left) / 2;
// Execute merge for task division
if (left < right) {
mergeSort(sortingArray, left, mid);
mergeSort(sortingArray, mid + 1, right);
merge(sortingArray, left, mid, right);
}
// Merge thread results
if (rank % 2 == 0) {
mergeThreadResults(rank, left, right, currentTask->size, currentTask->threads);
}
return 0;
}
struct ThreadTask *threadCreator(int size, int threads, pthread_t *thread_handles, struct ThreadTask *tasksHolder) {
/* Create threads with each task info */
struct ThreadTask *threadTask;
for (long thread = 0; thread < threads; thread++){
threadTask = &tasksHolder[thread];
threadTask->rank = thread;
threadTask->size = size;
threadTask->threads = threads;
pthread_create(&thread_handles[thread], NULL, sortManager, (void*) threadTask);
}
return tasksHolder;
}
void printArray(int arr[], int size) {
/* Print array */
for (int arrayIndex = 0; arrayIndex < size; arrayIndex++)
printf("%d ", arr[arrayIndex]);
printf("\n");
}
int main(int argc, char *argv[]) {
// Initialize arguments
int arraySize = 20;
int totalThreads = 16;
// Display input
printf("\nInput array:\n");
printArray(sortingArray, arraySize);
// Initialize threads
pthread_t *thread_handles;
thread_handles = malloc(totalThreads * sizeof(pthread_t));
// Create threads
struct ThreadTask threadTasksHolder[totalThreads];
*threadTasksHolder = *threadCreator(arraySize, totalThreads, thread_handles, threadTasksHolder);
// Execute merge sort in each thread
for (long thread = 0; thread < totalThreads; thread++) {
pthread_join(thread_handles[thread], NULL);
}
free(thread_handles);
// Display output
printf("\nSorted array:\n");
printArray(sortingArray, arraySize);
return 0;
}