I'm familiar with multithreading and I've developed many multithreaded programs in Java and Objective-C successfully. But I couldn't achieve the following in C using pthreads without using a join from the main thread:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_OF_THREADS 2
struct thread_data {
int start;
int end;
int *arr;
};
void print(int *ints, int n);
void *processArray(void *args);
int main(int argc, const char * argv[])
{
int numOfInts = 10;
int *ints = malloc(numOfInts * sizeof(int));
for (int i = 0; i < numOfInts; i++) {
ints[i] = i;
}
print(ints, numOfInts); // prints [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pthread_t threads[NUM_OF_THREADS];
struct thread_data thread_data[NUM_OF_THREADS];
// these vars are used to calculate the index ranges for each thread
int remainingWork = numOfInts, amountOfWork;
int startRange, endRange = -1;
for (int i = 0; i < NUM_OF_THREADS; i++) {
amountOfWork = remainingWork / (NUM_OF_THREADS - i);
startRange = endRange + 1;
endRange = startRange + amountOfWork - 1;
thread_data[i].arr = ints;
thread_data[i].start = startRange;
thread_data[i].end = endRange;
pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);
remainingWork -= amountOfWork;
}
// 1. Signal to the threads to start working
// 2. Wait for them to finish
print(ints, numOfInts); // should print [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
free(ints);
return 0;
}
void *processArray(void *args)
{
struct thread_data *data = (struct thread_data *)args;
int *arr = data->arr;
int start = data->start;
int end = data->end;
// 1. Wait for a signal to start from the main thread
for (int i = start; i <= end; i++) {
arr[i] = arr[i] + 1;
}
// 2. Signal to the main thread that you're done
pthread_exit(NULL);
}
void print(int *ints, int n)
{
printf("[");
for (int i = 0; i < n; i++) {
printf("%d", ints[i]);
if (i+1 != n)
printf(", ");
}
printf("]\n");
}
I would like to achieve the following in the above code:
In main():
- Signal to the threads to start working.
- Wait for the background threads to finish.
In processArray():
- Wait for a signal to start from the main thread
- Signal to the main thread that you're done
I don't want to use a join in the main thread because in the real application, the main thread will create the threads once, and then it will signal to the background threads to work many times, and I can't let the main thread proceed unless all the background threads have finished working. In the processArray
function, I will put an infinite loop as following:
void *processArray(void *args)
{
struct thread_data *data = (struct thread_data *)args;
while (1)
{
// 1. Wait for a signal to start from the main thread
int *arr = data->arr;
int start = data->start;
int end = data->end;
// Process
for (int i = start; i <= end; i++) {
arr[i] = arr[i] + 1;
}
// 2. Signal to the main thread that you're done
}
pthread_exit(NULL);
}
Note that I'm new to C and the posix API, so excuse me if I'm missing something obvious. But I really tried many things, starting from using a mutex, and an array of semaphores, and a mixture of both, but without success. I think a condition variable may help, but I couldn't understand how it could be used.
Thanks for your time.
Problem Solved:
Thank you guys so much! I was finally able to get this to work safely and without using a join by following your tips. Although the solution is somewhat ugly, it gets the job done and the performance gains is worth it (as you'll see below). For anyone interested, this is a simulation of the real application I'm working on, in which the main thread keeps giving work continuously to the background threads:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_OF_THREADS 5
struct thread_data {
int id;
int start;
int end;
int *arr;
};
pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t currentlyIdleCond = PTHREAD_COND_INITIALIZER;
int currentlyIdle;
pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t workReadyCond = PTHREAD_COND_INITIALIZER;
int workReady;
pthread_cond_t currentlyWorkingCond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER;
int currentlyWorking;
pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t canFinishCond = PTHREAD_COND_INITIALIZER;
int canFinish;
void print(int *ints, int n);
void *processArray(void *args);
int validateResult(int *ints, int num, int start);
int main(int argc, const char * argv[])
{
int numOfInts = 10;
int *ints = malloc(numOfInts * sizeof(int));
for (int i = 0; i < numOfInts; i++) {
ints[i] = i;
}
// print(ints, numOfInts);
pthread_t threads[NUM_OF_THREADS];
struct thread_data thread_data[NUM_OF_THREADS];
workReady = 0;
canFinish = 0;
currentlyIdle = 0;
currentlyWorking = 0;
// these vars are used to calculate the index ranges for each thread
int remainingWork = numOfInts, amountOfWork;
int startRange, endRange = -1;
// Create the threads and give each one its data struct.
for (int i = 0; i < NUM_OF_THREADS; i++) {
amountOfWork = remainingWork / (NUM_OF_THREADS - i);
startRange = endRange + 1;
endRange = startRange + amountOfWork - 1;
thread_data[i].id = i;
thread_data[i].arr = ints;
thread_data[i].start = startRange;
thread_data[i].end = endRange;
pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);
remainingWork -= amountOfWork;
}
int loops = 1111111;
int expectedStartingValue = ints[0] + loops; // used to validate the results
// The elements in ints[] should be incremented by 1 in each loop
while (loops-- != 0) {
// Make sure all of them are ready
pthread_mutex_lock(¤tlyIdleMutex);
while (currentlyIdle != NUM_OF_THREADS) {
pthread_cond_wait(¤tlyIdleCond, ¤tlyIdleMutex);
}
pthread_mutex_unlock(¤tlyIdleMutex);
// All threads are now blocked; it's safe to not lock the mutex.
// Prevent them from finishing before authorized.
canFinish = 0;
// Reset the number of currentlyWorking threads
currentlyWorking = NUM_OF_THREADS;
// Signal to the threads to start
pthread_mutex_lock(&workReadyMutex);
workReady = 1;
pthread_cond_broadcast(&workReadyCond );
pthread_mutex_unlock(&workReadyMutex);
// Wait for them to finish
pthread_mutex_lock(¤tlyWorkingMutex);
while (currentlyWorking != 0) {
pthread_cond_wait(¤tlyWorkingCond, ¤tlyWorkingMutex);
}
pthread_mutex_unlock(¤tlyWorkingMutex);
// The threads are now waiting for permission to finish
// Prevent them from starting again
workReady = 0;
currentlyIdle = 0;
// Allow them to finish
pthread_mutex_lock(&canFinishMutex);
canFinish = 1;
pthread_cond_broadcast(&canFinishCond);
pthread_mutex_unlock(&canFinishMutex);
}
// print(ints, numOfInts);
if (validateResult(ints, numOfInts, expectedStartingValue)) {
printf("Result correct.\n");
}
else {
printf("Result invalid.\n");
}
// clean up
for (int i = 0; i < NUM_OF_THREADS; i++) {
pthread_cancel(threads[i]);
}
free(ints);
return 0;
}
void *processArray(void *args)
{
struct thread_data *data = (struct thread_data *)args;
int *arr = data->arr;
int start = data->start;
int end = data->end;
while (1) {
// Set yourself as idle and signal to the main thread, when all threads are idle main will start
pthread_mutex_lock(¤tlyIdleMutex);
currentlyIdle++;
pthread_cond_signal(¤tlyIdleCond);
pthread_mutex_unlock(¤tlyIdleMutex);
// wait for work from main
pthread_mutex_lock(&workReadyMutex);
while (!workReady) {
pthread_cond_wait(&workReadyCond , &workReadyMutex);
}
pthread_mutex_unlock(&workReadyMutex);
// Do the work
for (int i = start; i <= end; i++) {
arr[i] = arr[i] + 1;
}
// mark yourself as finished and signal to main
pthread_mutex_lock(¤tlyWorkingMutex);
currentlyWorking--;
pthread_cond_signal(¤tlyWorkingCond);
pthread_mutex_unlock(¤tlyWorkingMutex);
// Wait for permission to finish
pthread_mutex_lock(&canFinishMutex);
while (!canFinish) {
pthread_cond_wait(&canFinishCond , &canFinishMutex);
}
pthread_mutex_unlock(&canFinishMutex);
}
pthread_exit(NULL);
}
int validateResult(int *ints, int n, int start)
{
int tmp = start;
for (int i = 0; i < n; i++, tmp++) {
if (ints[i] != tmp) {
return 0;
}
}
return 1;
}
void print(int *ints, int n)
{
printf("[");
for (int i = 0; i < n; i++) {
printf("%d", ints[i]);
if (i+1 != n)
printf(", ");
}
printf("]\n");
}
I'm not sure though if pthread_cancel
is enough for clean up! As for the barrier, it would've been of a great help if it wasn't limited to some OSs as mentioned by @Jeremy.
Benchmarks:
I wanted to make sure that these many conditions aren't actually slowing down the algorithm, so I've setup this benchmark to compare the two solutions:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/resource.h>
#define NUM_OF_THREADS 5
struct thread_data {
int start;
int end;
int *arr;
};
pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t currentlyIdleCond = PTHREAD_COND_INITIALIZER;
int currentlyIdle;
pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t workReadyCond = PTHREAD_COND_INITIALIZER;
int workReady;
pthread_cond_t currentlyWorkingCond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER;
int currentlyWorking;
pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t canFinishCond = PTHREAD_COND_INITIALIZER;
int canFinish;
void *processArrayMutex(void *args);
void *processArrayJoin(void *args);
double doItWithMutex(pthread_t *threads, struct thread_data *data, int loops);
double doItWithJoin(pthread_t *threads, struct thread_data *data, int loops);
int main(int argc, const char * argv[])
{
int numOfInts = 10;
int *join_ints = malloc(numOfInts * sizeof(int));
int *mutex_ints = malloc(numOfInts * sizeof(int));
for (int i = 0; i < numOfInts; i++) {
join_ints[i] = i;
mutex_ints[i] = i;
}
pthread_t join_threads[NUM_OF_THREADS];
pthread_t mutex_threads[NUM_OF_THREADS];
struct thread_data join_thread_data[NUM_OF_THREADS];
struct thread_data mutex_thread_data[NUM_OF_THREADS];
workReady = 0;
canFinish = 0;
currentlyIdle = 0;
currentlyWorking = 0;
int remainingWork = numOfInts, amountOfWork;
int startRange, endRange = -1;
for (int i = 0; i < NUM_OF_THREADS; i++) {
amountOfWork = remainingWork / (NUM_OF_THREADS - i);
startRange = endRange + 1;
endRange = startRange + amountOfWork - 1;
join_thread_data[i].arr = join_ints;
join_thread_data[i].start = startRange;
join_thread_data[i].end = endRange;
mutex_thread_data[i].arr = mutex_ints;
mutex_thread_data[i].start = startRange;
mutex_thread_data[i].end = endRange;
pthread_create(&mutex_threads[i], NULL, processArrayMutex, (void *)&mutex_thread_data[i]);
remainingWork -= amountOfWork;
}
int numOfBenchmarkTests = 100;
int numberOfLoopsPerTest= 1000;
double join_sum = 0.0, mutex_sum = 0.0;
for (int i = 0; i < numOfBenchmarkTests; i++)
{
double joinTime = doItWithJoin(join_threads, join_thread_data, numberOfLoopsPerTest);
double mutexTime= doItWithMutex(mutex_threads, mutex_thread_data, numberOfLoopsPerTest);
join_sum += joinTime;
mutex_sum+= mutexTime;
}
double join_avg = join_sum / numOfBenchmarkTests;
double mutex_avg= mutex_sum / numOfBenchmarkTests;
printf("Join average : %f\n", join_avg);
printf("Mutex average: %f\n", mutex_avg);
double diff = join_avg - mutex_avg;
if (diff > 0.0)
printf("Mutex is %.0f%% faster.\n", 100 * diff / join_avg);
else if (diff < 0.0)
printf("Join is %.0f%% faster.\n", 100 * diff / mutex_avg);
else
printf("Both have the same performance.");
free(join_ints);
free(mutex_ints);
return 0;
}
// From https://stackoverflow.com/a/2349941/408286
double get_time()
{
struct timeval t;
struct timezone tzp;
gettimeofday(&t, &tzp);
return t.tv_sec + t.tv_usec*1e-6;
}
double doItWithMutex(pthread_t *threads, struct thread_data *data, int num_loops)
{
double start = get_time();
int loops = num_loops;
while (loops-- != 0) {
// Make sure all of them are ready
pthread_mutex_lock(¤tlyIdleMutex);
while (currentlyIdle != NUM_OF_THREADS) {
pthread_cond_wait(¤tlyIdleCond, ¤tlyIdleMutex);
}
pthread_mutex_unlock(¤tlyIdleMutex);
// All threads are now blocked; it's safe to not lock the mutex.
// Prevent them from finishing before authorized.
canFinish = 0;
// Reset the number of currentlyWorking threads
currentlyWorking = NUM_OF_THREADS;
// Signal to the threads to start
pthread_mutex_lock(&workReadyMutex);
workReady = 1;
pthread_cond_broadcast(&workReadyCond );
pthread_mutex_unlock(&workReadyMutex);
// Wait for them to finish
pthread_mutex_lock(¤tlyWorkingMutex);
while (currentlyWorking != 0) {
pthread_cond_wait(¤tlyWorkingCond, ¤tlyWorkingMutex);
}
pthread_mutex_unlock(¤tlyWorkingMutex);
// The threads are now waiting for permission to finish
// Prevent them from starting again
workReady = 0;
currentlyIdle = 0;
// Allow them to finish
pthread_mutex_lock(&canFinishMutex);
canFinish = 1;
pthread_cond_broadcast(&canFinishCond);
pthread_mutex_unlock(&canFinishMutex);
}
return get_time() - start;
}
double doItWithJoin(pthread_t *threads, struct thread_data *data, int num_loops)
{
double start = get_time();
int loops = num_loops;
while (loops-- != 0) {
// create them
for (int i = 0; i < NUM_OF_THREADS; i++) {
pthread_create(&threads[i], NULL, processArrayJoin, (void *)&data[i]);
}
// wait
for (int i = 0; i < NUM_OF_THREADS; i++) {
pthread_join(threads[i], NULL);
}
}
return get_time() - start;
}
void *processArrayMutex(void *args)
{
struct thread_data *data = (struct thread_data *)args;
int *arr = data->arr;
int start = data->start;
int end = data->end;
while (1) {
// Set yourself as idle and signal to the main thread, when all threads are idle main will start
pthread_mutex_lock(¤tlyIdleMutex);
currentlyIdle++;
pthread_cond_signal(¤tlyIdleCond);
pthread_mutex_unlock(¤tlyIdleMutex);
// wait for work from main
pthread_mutex_lock(&workReadyMutex);
while (!workReady) {
pthread_cond_wait(&workReadyCond , &workReadyMutex);
}
pthread_mutex_unlock(&workReadyMutex);
// Do the work
for (int i = start; i <= end; i++) {
arr[i] = arr[i] + 1;
}
// mark yourself as finished and signal to main
pthread_mutex_lock(¤tlyWorkingMutex);
currentlyWorking--;
pthread_cond_signal(¤tlyWorkingCond);
pthread_mutex_unlock(¤tlyWorkingMutex);
// Wait for permission to finish
pthread_mutex_lock(&canFinishMutex);
while (!canFinish) {
pthread_cond_wait(&canFinishCond , &canFinishMutex);
}
pthread_mutex_unlock(&canFinishMutex);
}
pthread_exit(NULL);
}
void *processArrayJoin(void *args)
{
struct thread_data *data = (struct thread_data *)args;
int *arr = data->arr;
int start = data->start;
int end = data->end;
// Do the work
for (int i = start; i <= end; i++) {
arr[i] = arr[i] + 1;
}
pthread_exit(NULL);
}
And the output is:
Join average : 0.153074
Mutex average: 0.071588
Mutex is 53% faster.
Thank you again. I really appreciate your help!