0

I'm pretty new to C, so I'm not sure where even to start digging about my problem. I'm trying to port python number-crunching algos to C, and since there's no GIL in C (woohoo), I can change whatever I want in memory from threads, for as long as I make sure there are no races.

I did my homework on mutexes, however, I cannot wrap my head around use of mutexes in case of continuously running threads accessing the same array over and over.

I'm using p_threads in order to split the workload on a big array a[N]. Number crunching algorithm on array a[N] is additive, so I'm splitting it using a_diff[N_THREADS][N] array, writing changes to be applied to a[N] array from each thread to a_diff[N_THREADS][N] and then merging them all together after each step.

I need to run the crunching on different versions of array a[N], so I pass them via global pointer p (in the MWE, there's only one a[N])

I'm synchronizing threads using another global array SYNC_THREADS[N_THREADS] and make sure threads quit when I need them to by setting END_THREADS global (I know, I'm using too many globals - I don't care, code is ~200 lines). My question is in regard to this synchronization technique - is it safe to do so and what is cleaner/better/faster way to achieve that?

MWEe:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define N_THREADS 3
#define N 10000000
#define STEPS 3

double a[N];  // main array
double a_diff[N_THREADS][N];  // diffs array
double params[N];  // parameter used for number-crunching
double (*p)[N];  // pointer to array[N]

// structure for bounds for crunching the array
struct bounds {
    int lo;
    int hi;
    int thread_num;
};
struct bounds B[N_THREADS];
int SYNC_THREADS[N_THREADS];  // for syncing threads
int END_THREADS = 0;  // signal to terminate threads


static void *crunching(void *arg) {
    // multiple threads run number-crunching operations according to assigned low/high bounds
    struct bounds *data = (struct bounds *)arg;
    int lo = (*data).lo;
    int hi = (*data).hi;
    int thread_num = (*data).thread_num;
    printf("worker %d started for bounds [%d %d] \n", thread_num, lo, hi);

    int i;

    while (END_THREADS != 1) {  // END_THREADS tells threads to terminate
        if (SYNC_THREADS[thread_num] == 1) {  // SYNC_THREADS allows threads to start number-crunching
            printf("worker %d working... \n", thread_num );
            for (i = lo; i <= hi; ++i) {
                a_diff[thread_num][i] += (*p)[i] * params[i];  // pretend this is an expensive operation...
            }
            SYNC_THREADS[thread_num] = 0;  // thread disables itself until SYNC_THREADS is back to 1
            printf("worker %d stopped... \n", thread_num );
        }
    }
    return 0;
}


int i, j, th,s;
double joiner;

int main() {
    // pre-fill arrays
    for (i = 0; i < N; ++i) {
        a[i] = i + 0.5;
        params[i] = 0.0;
    }

    // split workload between workers
    int worker_length = N / N_THREADS;
    for (i = 0; i < N_THREADS; ++i) {
        B[i].thread_num = i;
        B[i].lo = i * worker_length;
        if (i == N_THREADS - 1) {
            B[i].hi = N;
        } else {
            B[i].hi = i * worker_length + worker_length - 1;
        }
    }
    // pointer to parameters to be passed to worker
    struct bounds **data = malloc(N_THREADS * sizeof(struct bounds*));
    for (i = 0; i < N_THREADS; i++) {
        data[i] = malloc(sizeof(struct bounds));
        data[i]->lo = B[i].lo;
        data[i]->hi = B[i].hi;
        data[i]->thread_num = B[i].thread_num;
    }
    // create thread objects
    pthread_t threads[N_THREADS];

    // disallow threads to crunch numbers
    for (th = 0; th < N_THREADS; ++th) {
        SYNC_THREADS[th] = 0;
    }

    // launch workers
    for(th = 0; th < N_THREADS; th++) {
        pthread_create(&threads[th], NULL, crunching, data[th]);
    }

    // big loop of iterations
    for (s = 0; s < STEPS; ++s) {
        for (i = 0; i < N; ++i) {
            params[i] += 1.0;  // adjust parameters

            // zero diff array
            for (i = 0; i < N; ++i) {
                for (th = 0; th < N_THREADS; ++th) {
                    a_diff[th][i] = 0.0;
                }
            }
            p = &a;  // pointer to array a
            // allow threads to process numbers and wait for threads to complete
            for (th = 0; th < N_THREADS; ++th) { SYNC_THREADS[th] = 1; }
            // ...here threads started by pthread_create do calculations...
            for (th = 0; th < N_THREADS; th++) { while (SYNC_THREADS[th] != 0) {} }

            // join results from threads (number-crunching is additive)
            for (i = 0; i < N; ++i) {
                joiner = 0.0;
                for (th = 0; th < N_THREADS; ++th) {
                    joiner += a_diff[th][i];
                }
                a[i] += joiner;
            }
        }
    }


    // join workers
    END_THREADS = 1;
    for(th = 0; th < N_THREADS; th++) {
        pthread_join(threads[th], NULL);
    }

    return 0;
}

I see that workers don't overlap time-wise:

worker 0 started for bounds [0 3333332]
worker 1 started for bounds [3333333 6666665]
worker 2 started for bounds [6666666 10000000]
worker 0 working...
worker 1 working...
worker 2 working...
worker 2 stopped...
worker 0 stopped...
worker 1 stopped...
worker 2 working...
worker 0 working...
worker 1 working...
worker 1 stopped...
worker 0 stopped...
worker 2 stopped...
worker 2 working...
worker 0 working...
worker 1 working...
worker 1 stopped...
worker 2 stopped...
worker 0 stopped...

Process returned 0 (0x0)   execution time : 1.505 s

and I make sure worker's don't get into each other workspaces by separating them via a_diff[thead_num][N] sub-arrays, however, I'm not sure that's always the case and that I'm not introducing hidden races somewhere...

nagimov
  • 130
  • 1
  • 4
  • I suggest asking https://codereview.stackexchange.com/ since your question seems to be one of best practice, which can be subjective. – e0k May 09 '18 at 22:24
  • 1
    Tip: you can take a look at conditional variables (`pthread_cond`) to replace your END_THREADS and SYNC_THREADS active-wait mechanism: https://stackoverflow.com/questions/20772476/when-to-use-pthread-conditional-variables – João Neto May 10 '18 at 00:48

1 Answers1

1

I didn't realize what was the question :-)

So, the question is if you're thinking well with your SYNC_THREADS and END_THREADS synchronization mechanism.
Yes!... Almost. The problem is that threads are burning CPU while waiting.

Conditional Variables

To make threads wait for an event you have conditional variables (pthread_cond). These offer a few useful functions like wait(), signal() and broadcast():

  • wait(&cond, &m) blocks a thread in a given condition variable. [note 2]
  • signal(&cond) unlocks a thread waiting in a given condition variable.
  • broadcast(&cond) unlocks all threads waiting in a given condition variable.

Initially you'd have all the threads waiting [note 1]:

while(!start_threads)
  pthread_cond_wait(&cond_start);

And, when the main thread is ready:

start_threads = 1;
pthread_cond_broadcast(&cond_start);

Barriers

If you have data dependencies between iterations, you'd want to make sure threads are executing the same iteration at any given moment.

To synchronize threads at the end of each iteration, you'll want to take a look at barriers (pthread_barrier):

  • pthread_barrier_init(count): initializes a barrier to synchronize count threads.
  • pthread_barrier_wait(): thread waits here until all count threads reach the barrier.

Extending functionality of barriers

Sometimes you'll want the last thread reaching a barrier to compute something (e.g. to increment the counter of number of iterations, or to compute some global value, or to check if execution should stop). You have two alternatives

Using pthread_barriers

You'll need to essentially have two barriers:

int rc = pthread_barrier_wait(&b);
if(rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
  if(shouldStop()) stop = 1;
pthread_barrier_wait(&b);
if(stop) return;

Using pthread_conds to implement our own specialized barrier

pthread_mutex_lock(&mutex)
remainingThreads--;
// all threads execute this
executedByAllThreads();
if(remainingThreads == 0) {
  // reinitialize barrier
  remainingThreads = N;
  // only last thread executes this
  if(shouldStop()) stop = 1;
  pthread_cond_broadcast(&cond);
} else {
while(remainingThreads > 0)
  pthread_cond_wait(&cond, &mutex);
}
pthread_mutex_unlock(&mutex);

Note 1: why is pthread_cond_wait() inside a while block? May seem a bit odd. The reason behind it is due to the existence of spurious wakeups. The function may return even if no signal() or broadcast() was issued. So, just to guarantee correctness, it's usual to have an extra variable to guarantee that if a thread suddenly wakes up before it should, it runs back into the pthread_cond_wait().

From the manual:

When using condition variables there is always a Boolean predicate involving shared variables associated with each condition wait that is true if the thread should proceed. Spurious wakeups from the pthread_cond_timedwait() or pthread_cond_wait() functions may occur. Since the return from pthread_cond_timedwait() or pthread_cond_wait() does not imply anything about the value of this predicate, the predicate should be re-evaluated upon such return.

(...)

If a signal is delivered to a thread waiting for a condition variable, upon return from the signal handler the thread resumes waiting for the condition variable as if it was not interrupted, or it shall return zero due to spurious wakeup.

Note 2:

An noted by Michael Burr in the comments, you should hold a companion lock whenever you modify the predicate (start_threads) and pthread_cond_wait(). pthread_cond_wait() will release the mutex when called; and re-acquires it when it returns.

PS: It's a bit late here; sorry if my text is confusing :-)

Community
  • 1
  • 1
João Neto
  • 1,732
  • 17
  • 28
  • 2
    Another very important part of the contract for using condition variables is that the mutex that controls the modification of the predicate (`start_threads` in the example) *must* be held while checking the predicate and calling `pthread_cond_wait()`. That is why `pthread_cond_wait()` take a pointer to a mutex - so it can release the mutex at the precisely correct time. – Michael Burr May 10 '18 at 01:16
  • Well-said! Forgot that detail :) – João Neto May 10 '18 at 01:33
  • Thanks! This is awesome! I guess I didn't do my homework on mutexes thoroughly enough :) – nagimov May 11 '18 at 03:42