First, a summary:
pthread_mutex_lock(&mutex)
:
If mutex
is free, then this thread grabs it immediately.
If mutex
is grabbed, then this thread waits until the mutex
becomes free, and then grabs it.
pthread_mutex_trylock(&mutex)
:
If mutex
is free, then this thread grabs it.
If mutex
is grabbed, then the call returns immediately with EBUSY
.
pthread_mutex_unlock(&mutex)
:
Releases mutex
.
pthread_cond_signal(&cond)
:
Wake up one thread waiting on the condition variable cond
.
pthread_cond_broadcast(&cond)
:
Wake up all threads waiting on the condition variable cond
.
pthread_cond_wait(&cond, &mutex)
:
This must be called with mutex
grabbed.
The calling thread will temporarily release mutex
and wait on cond
.
When cond
is broadcast on, or signaled on and this thread happens to be the one woken up, then the calling thread will first re-grab the mutex
, and then return from the call.
It is important to note that at all times, the calling thread either has mutex
grabbed, or is waiting on cond
. There is no interval in between.
Let's look at a practical, running example code. We'll create it along the lines of OP's code.
First, we'll use a structure to hold the parameters for each worker function. Since we'll want the mutex and the condition variable to be shared between threads, we'll use pointers.
#define _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <pthread.h>
#include <limits.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
/* Worker function work. */
struct work {
pthread_t thread_id;
pthread_mutex_t *lock; /* Pointer to the mutex to use */
pthread_cond_t *wait; /* Pointer to the condition variable to use */
volatile int *done; /* Pointer to the flag to check */
FILE *out; /* Stream to output to */
long id; /* Identity of this thread */
unsigned long count; /* Number of times this thread iterated. */
};
The thread worker function receives a pointer to the above structure. Each thread iterates the loop once, then waits on the condition variable. When woken up, if the done flag is still zero, the thread iterates the loop. Otherwise, the thread exits.
/* Example worker function. */
void *worker(void *workptr)
{
struct work *const work = workptr;
pthread_mutex_lock(work->lock);
/* Loop as long as *done == 0: */
while (!*(work->done)) {
/* *(work->lock) is ours at this point. */
/* This is a new iteration. */
work->count++;
/* Do the work. */
fprintf(work->out, "Thread %ld iteration %lu\n", work->id, work->count);
fflush(work->out);
/* Wait for wakeup. */
pthread_cond_wait(work->wait, work->lock);
}
/* *(work->lock) is still ours, but we've been told that all work is done already. */
/* Release the mutex and be done. */
pthread_mutex_unlock(work->lock);
return NULL;
}
To run the above, we'll need a main() as well:
#ifndef THREADS
#define THREADS 4
#endif
int main(void)
{
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t wait = PTHREAD_COND_INITIALIZER;
volatile int done = 0;
struct work w[THREADS];
char *line = NULL, *p;
size_t size = 0;
ssize_t len = 0;
unsigned long total;
pthread_attr_t attrs;
int i, err;
/* The worker functions require very little stack, but the default stack
size is huge. Limit that, to reduce the (virtual) memory use. */
pthread_attr_init(&attrs);
pthread_attr_setstacksize(&attrs, 2 * PTHREAD_STACK_MIN);
/* Grab the mutex so the threads will have to wait to grab it. */
pthread_mutex_lock(&lock);
/* Create THREADS worker threads. */
for (i = 0; i < THREADS; i++) {
/* All threads use the same mutex, condition variable, and done flag. */
w[i].lock = &lock;
w[i].wait = &wait;
w[i].done = &done;
/* All threads output to standard output. */
w[i].out = stdout;
/* The rest of the fields are thread-specific. */
w[i].id = i + 1;
w[i].count = 0;
err = pthread_create(&(w[i].thread_id), &attrs, worker, (void *)&(w[i]));
if (err) {
fprintf(stderr, "Cannot create thread %d of %d: %s.\n", i+1, THREADS, strerror(errno));
exit(EXIT_FAILURE); /* Exits the entire process, killing any other threads as well. */
}
}
fprintf(stderr, "The first character on each line controls the type of event:\n");
fprintf(stderr, " e, q exit\n");
fprintf(stderr, " s signal\n");
fprintf(stderr, " b broadcast\n");
fflush(stderr);
/* Let each thread grab the mutex now. */
pthread_mutex_unlock(&lock);
while (1) {
len = getline(&line, &size, stdin);
if (len < 1)
break;
/* Find the first character on the line, ignoring leading whitespace. */
p = line;
while ((p < line + len) && (*p == '\0' || *p == '\t' || *p == '\n' ||
*p == '\v' || *p == '\f' || *p == '\r' || *p == ' '))
p++;
/* Do the operation mentioned */
if (*p == 'e' || *p == 'E' || *p == 'q' || *p == 'Q')
break;
else
if (*p == 's' || *p == 'S')
pthread_cond_signal(&wait);
else
if (*p == 'b' || *p == 'B')
pthread_cond_broadcast(&wait);
}
/* It is time for the worker threads to be done. */
pthread_mutex_lock(&lock);
done = 1;
pthread_mutex_unlock(&lock);
/* To ensure all threads see the state of that flag,
we wake up all threads by broadcasting on the condition variable. */
pthread_cond_broadcast(&wait);
/* Reap all threds. */
for (i = 0; i < THREADS; i++)
pthread_join(w[i].thread_id, NULL);
/* Output the thread statistics. */
total = 0;
for (i = 0; i < THREADS; i++) {
total += w[i].count;
fprintf(stderr, "Thread %ld: %lu events.\n", w[i].id, w[i].count);
}
fprintf(stderr, "Total: %lu events.\n", total);
return EXIT_SUCCESS;
}
If you save the above as example.c
, you can compile it to example
using e.g. gcc -Wall -O2 example.c -lpthread -o example
.
To get the correct intuitive grasp of the operations, run the example in a terminal, with the source code in a window next to it, and see how the execution progresses as you provide input.
You can even run commands like printf '%s\n' s s s b q | ./example
to run a sequence of events in a quick succession, or printf 's\ns\ns\nb\nq\n' | ./example
with even less time in between events.
After some experimentation, you'll hopefully find out that not all input events cause their respective action. This is because the exit event (q
above) is not synchronous: it does not wait for all pending work to be done, but tells the threads to exit right then and there. That is why the number of events may vary even for the exact same input.
(Also, if you signal on the condition variable, and immediately broadcast on it, the threads tend to only get woken up once.)
You can mitigate that by delaying the exit, using e.g. (printf '%s\n' s s b s s s ; sleep 1 ; printf 'q\n' ) | ./example
.
However, there are better ways. A condition variable is not suitable for countable events; it is really flag-like. A semaphore would work better, but then you should be careful to not overflow the semaphore; it can only be from 0 to SEM_VALUE_MAX
, inclusive. (So, you could use a semaphore to represent the number of pending job, but probably not for the number of iterations done by each/all thread workers.) A queue for the work to do, in thread pool fashion, is the most common approach.