0

I'm new to multithreading and im trying to implement a simple thread safe queue of tasks where each thread can pull work from until there's no more tasks left. No queuing of tasks will be made by any of the threads.

For testing purposeses every Task holds just a number.

    static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER;

    typedef struct Task{
       int number;
    }Task;


    typedef struct Cell{
        Task t;
        struct Cell* next;
    }Cell;


    typedef struct TQueue{
        struct Cell* head;
        struct Cell* tail;
    }TQueue;



   int empty(TQueue *Queue) 
      return queue->head == queue->tail;


   void startQueue(TQueue *queue){

        queue->head = malloc(sizeof(Cell));
        queue->tail = queue->head;
   }

   void enqueue(TQueue *queue, Task C){

       queue->tail->next = malloc(sizeof(Cell));
       queue->tail = queue->tail->next;
       queue->tail->t = C;
       queue->tail->next = NULL; 
   }


    Task * dequeue(TQueue* queue){

       pthread_mutex_lock( &task_mutex);
       Task * t;

       if(empty(queue)) t = NULL;

       else{

           struct Cell* p = queue->head;
           queue->head = queue->head->next;
           t = &queue->head->t;
           free(p);
       }

       pthread_mutex_unlock( &task_mutex);
       return t;
    }

    void * work( void* arg){

       TQueue* queue = (TQueue *)arg;
       Task* t = malloc(sizeof(Task));

       for(t = dequeue(queue); t != NULL; t = dequeue(queue))
           printf("%d ", t->number);

       free(t);
       pthread_exit(NULL);
       return 0;
    }

For a simple test i runned this on main:

int main(){

    TQueue* queue = malloc(sizeof(TQueue));
    startQueue(queue);

    pthread_t threads[3];
    Task t[3];


    for(int i = 0; i < 3; i++){
        t[i].number = i + 1;
        enqueue(queue, t[i]);
    }

    for(int i = 0; i < 3; i++) pthread_create(&threads[i], NULL, work, (void*)queue);

    for(int i = 0; i < 3; i++) pthread_join(threads[i], NULL);

    return 0;
}

The expected output was 1 2 3 in any order, but sometimes it prints a sequence with a strange number in it like 1823219 2 3. I have not been able to detect any race conditions or related problems, so i appreciate any help.

Pastel Assado
  • 19
  • 1
  • 4
  • `t = &queue->head->t; free(p);` `p` is actually `queue->head`. So `free(p)` will make `t` point to freed memory resulting in Undefined Behaviour. – kaylum Jan 28 '17 at 21:21
  • After fixing the bug kaylum mentioned, your `enqueue` and `dequeue` may be atomic (i.e. `dequeue` will pull the elements off in the correct order), but there is _still_ a race condition: task 1 may dequeue 1 and task 2 may dequeue 2, but task 2 may _print_ first, so you'd get `2 1` because the `for` for printing does _not_ have a lock – Craig Estey Jan 28 '17 at 21:26
  • @kaylum queue->head is moved to queue->head->next before t gets assigned to &queue->head->t. – Pastel Assado Jan 28 '17 at 21:32
  • @CraigEstey I understand that the printing order may vary. The problem is that, in some cases, it prints a completly different number in the sequence as i mentioned. – Pastel Assado Jan 28 '17 at 21:36
  • You are right. So why is the `head` value not being used each time and just discarded? That could explain why the first value is garbage. Maybe that is a sentinel node, but you haven't shown the startQueue nor enqueue function so can't be sure. – kaylum Jan 28 '17 at 21:37
  • If you can post your complete/correct code (e.g. definition of `empty` is missing and, in `dequeue`, the `empty(Queue)` should be `empty(queue)`), I/we could try to run it and see if we get different results – Craig Estey Jan 28 '17 at 21:41
  • At a minimum, in `main`, I think you want `enqueue(queue,&t[i])` instead of `enqueue(queue,t)` which is equivalent to `enqueue(queue,&t[0])` – Craig Estey Jan 28 '17 at 21:56
  • @CraigEstey I added the functions. Thanks for pointing that out. – Pastel Assado Jan 28 '17 at 22:00

1 Answers1

0

I found a few more bugs.

I've annotated your code. I took a bit from your first posting and your second. I've fixed the code, showing before and after [please pardon the gratuitous style cleanup]:

#include <stdio.h>
#include <pthread.h>
#include <malloc.h>

static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER;

typedef struct Task {
    int number;
} Task;

typedef struct Cell {
// NOTE/BUG: this should be a pointer to the task. otherwise, dequeue gets
// messy
#if 0
    Task t;
#else
    Task *t;
#endif
    struct Cell *next;
} Cell;

typedef struct TQueue {
    struct Cell *head;
    struct Cell *tail;
} TQueue;

void
startQueue(TQueue *queue)
{

#if 0
    queue->head = malloc(sizeof(Cell));
#else
    queue->head = NULL;
#endif
    queue->tail = NULL;
}

int
empty(TQueue *queue)
{

    // NOTE/BUG: dequeue never touches tail, so this test is incorrect
#if 0
    return (queue->head == queue->tail);
#else
    return (queue->head == NULL);
#endif
}

void
enqueue(TQueue *queue, Task *t)
{
    Cell *p;

    pthread_mutex_lock(&task_mutex);

    p = malloc(sizeof(Cell));
    p->next = NULL;
    p->t = t;

    if (queue->tail == NULL) {
        queue->tail = p;
        queue->head = p;
    }
    else {
        queue->tail->next = p;
        queue->tail = p;
    }

    pthread_mutex_unlock(&task_mutex);
}

Task *
dequeue(TQueue *queue)
{
    Task *t;

    pthread_mutex_lock(&task_mutex);

    if (empty(queue))
        t = NULL;

    else {
        Cell *p = queue->head;

        if (p == queue->tail)
            queue->tail = NULL;

        queue->head = p->next;

        // NOTE/BUG: this is setting t to the second element in the list,
        // not the first
        // NOTE/BUG: this is also undefined behavior, in original code (with
        // original struct definition), because what t points to _does_ get
        // freed before return
#if 0
        t = &queue->head->t;
#else
        t = p->t;
#endif

        free(p);
    }

    pthread_mutex_unlock(&task_mutex);

    return t;
}

void *
work(void *arg)
{

    TQueue *queue = (TQueue *) arg;

    // NOTE/BUG: this gets orphaned on the first call to dequeue
#if 0
    Task *t = malloc(sizeof(Task));
#else
    Task *t;
#endif

    for (t = dequeue(queue); t != NULL; t = dequeue(queue))
        printf("%d ", t->number);

    // NOTE/BUG: this frees some cell allocated in main -- not what we want
#if 0
    free(t);
#endif

    pthread_exit(NULL);
    return 0;
}

// For a simple test i runned this on main:

int
main()
{

    TQueue *queue = malloc(sizeof(TQueue));

    startQueue(queue);

    pthread_t threads[3];
    Task t[3];

    for (int i = 0; i < 3; i++) {
        t[i].number = i + 1;
#if 0
        enqueue(queue, t);
#else
        enqueue(queue, &t[i]);
#endif
    }

    for (int i = 0; i < 3; i++)
        pthread_create(&threads[i], NULL, work, (void *) queue);

    for (int i = 0; i < 3; i++)
        pthread_join(threads[i], NULL);

    return 0;
}

UPDATE:

Are the threads executing the tasks concurrently ? I've been testing the cpu usage with htop and i can only max the usage of a single core out of four.

A few things to keep in mind. htop probably won't show much on programs that have such a short running time. Even with 10,000 queue entries this program executes in 20ms.

It's better to have the program itself print the information [see below]. Note that printf does thread locking on stdin so it may contribute to the "serial" nature of the program. It also contributes a significant amount to the execution time of the program (i.e. the printf is much slower than the dequeue)

Also, one thread (i.e. the first one) could monopolize the queue and drain all entries before the others have a chance to run.

The OS may [is at liberty to] schedule all threads on a single core. It may then "migrate" them later (e.g. within a second or so).

I've enhanced the program to include some timing information in the output print that may help show more of what you'd like to see. Also, I've added command line options to control the number of threads and number of items queued. This is similar to what I do for some of my own programs. Divert the program output to a log file and examine it. Play around with the options on multiple runs

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <malloc.h>
#include <time.h>

int opt_n;                              // suppress thread output
int opt_T;                              // number of threads
int opt_Q;                              // number of queue items

static pthread_mutex_t task_mutex = PTHREAD_MUTEX_INITIALIZER;

double tvzero;

typedef struct Task {
    int number;
} Task;

typedef struct Cell {
    Task *t;
    struct Cell *next;
} Cell;

typedef struct TQueue {
    struct Cell *head;
    struct Cell *tail;
} TQueue;

typedef struct Thread {
    pthread_t tid;
    int xid;
    TQueue *queue;
} Thread;

double
tvgetf(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);
    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    sec -= tvzero;

    return sec;
}

void
startQueue(TQueue *queue)
{

    queue->head = NULL;
    queue->tail = NULL;
}

int
empty(TQueue *queue)
{

    return (queue->head == NULL);
}

void
enqueue(TQueue *queue, Task *t)
{
    Cell *p;

    pthread_mutex_lock(&task_mutex);

    p = malloc(sizeof(Cell));
    p->next = NULL;
    p->t = t;

    if (queue->tail == NULL) {
        queue->tail = p;
        queue->head = p;
    }
    else {
        queue->tail->next = p;
        queue->tail = p;
    }

    pthread_mutex_unlock(&task_mutex);
}

Task *
dequeue(TQueue *queue)
{
    Task *t;

    pthread_mutex_lock(&task_mutex);

    if (empty(queue))
        t = NULL;

    else {
        Cell *p = queue->head;

        if (p == queue->tail)
            queue->tail = NULL;

        queue->head = p->next;

        t = p->t;

        free(p);
    }

    pthread_mutex_unlock(&task_mutex);

    return t;
}

void *
work(void *arg)
{
    Thread *tskcur = arg;
    TQueue *queue = tskcur->queue;
    Task *t;
    double tvbef;
    double tvaft;

    while (1) {
        tvbef = tvgetf();
        t = dequeue(queue);
        tvaft = tvgetf();

        if (t == NULL)
            break;

        if (! opt_n)
            printf("[%.9f/%.9f %5.5d] %d\n",
                tvbef,tvaft - tvbef,tskcur->xid,t->number);
    }

    return (void *) 0;
}

// For a simple test i runned this on main:

int
main(int argc,char **argv)
{
    char *cp;
    TQueue *queue;
    Task *t;
    Thread *tsk;

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'n':  // suppress thread output
            opt_n = 1;
            break;

        case 'Q':  // number of queue items
            opt_Q = atoi(cp + 2);
            break;

        case 'T':  // number of threads
            opt_T = atoi(cp + 2);
            break;

        default:
            break;
        }
    }

    tvzero = tvgetf();

    queue = malloc(sizeof(TQueue));
    startQueue(queue);

    if (opt_T == 0)
        opt_T = 16;
    Thread threads[opt_T];

    if (opt_Q == 0)
        opt_Q = 10000;
    t = malloc(sizeof(Task) * opt_Q);

    for (int i = 0; i < opt_Q; i++) {
        t[i].number = i + 1;
        enqueue(queue, &t[i]);
    }

    for (int i = 0; i < opt_T; i++) {
        tsk = &threads[i];
        tsk->xid = i + 1;
        tsk->queue = queue;
        pthread_create(&tsk->tid, NULL, work, tsk);
    }

    for (int i = 0; i < opt_T; i++) {
        tsk = &threads[i];
        pthread_join(tsk->tid, NULL);
    }

    printf("TOTAL: %.9f\n",tvgetf());

    free(t);

    return 0;
}

UPDATE #2:

Also, one thread (i.e. the first one) could monopolize the queue and drain all entries before the others have a chance to run." What can be done in that case ?

A few things.

pthread_create takes a bit of time, allowing thread 1 to go while the others are still being created. A way to ameliorate this is to create all threads, each thread sets an "I am running" flag (in its thread control block). The main thread waits for all threads to set this flag. Then, the main thread sets a global volatile "you_may_now_all_run" flag that each thread spins on before entering its primary thread loop. In my experience, they all start running within microseconds of each other [or better].

I didn't implement this in the updated code below, so you can experiment with it yourself [along with the nanosleep].

mutexes are pretty fair overall [under linux, at least] because a blocked thread will get queued, waiting on the mutex. As I mentioned in the comments, a nanosleep can also be used, but this [somewhat] defeats the purpose as the threads will slow down.

The antidote to thread starvation is "fairness". As I mentioned, there is an elaborate algorithm for fairness without waiting. It is the Kogan/Petrank algorithm: http://www.cs.technion.ac.il/~erez/Papers/wf-methodology-ppopp12.pdf This is really a bit involved/advanced, so caveat emptor ...

However, a compromise may be a ticket lock: https://en.wikipedia.org/wiki/Ticket_lock

I've reworked the program again. It has options for pooled allocation, ticket vs. mutex lock, and deferred printing of log entries. It also cross-checks the results between threads to ensure none of them got duplicate entries.

Of course, the key to all this is accurate, high precision logging (i.e. if you can't measure it, you can't tune it).

For example, one would think that doing free inside dequeue would be slower than simply releasing the Cell to a resuable pool (similar to a slab allocator), but, the performance boost wasn't as great as expected. This could be that glibc's malloc/free is just blazing fast [which is what they claim].

These various versions should give you some ideas of how to build up your own performance measurement suite.

Anyway, here's the code:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#include <malloc.h>
#include <errno.h>
#include <string.h>
#include <time.h>

int opt_p;                              // print thread output immediately
int opt_T;                              // number of threads
int opt_Q;                              // number of queue items
int opt_L;                              // use ticket lock
int opt_M;                              // use fast cell alloc/free

typedef unsigned char byte;
typedef unsigned int u32;

#define sysfault(_fmt...) \
    do { \
        fprintf(stderr,_fmt); \
        exit(1); \
    } while (0)

// lock control
typedef struct AnyLock {
    pthread_mutex_t mutex;              // standard mutex
    volatile u32 seqreq;                // ticket lock request
    volatile u32 seqacq;                // ticket lock grant
} AnyLock;

// work value
typedef struct Task {
    union {
        struct Task *next;
        int number;
    };
} Task;

// queue item
typedef struct Cell {
    struct Cell *next;
    Task *t;
} Cell;

// queue control
typedef struct TQueue {
    struct Cell *head;
    struct Cell *tail;
} TQueue;

// thread log entry
typedef struct Log {
    double tvbef;
    double tvaft;
    int number;
} Log;

#define BTVOFF(_off) \
    ((_off) >> 3)
#define BTVMSK(_off) \
    (1u << ((_off) & 0x07))

#define BTVLEN(_len) \
    ((_len) + 7) >> 3

// thread control
typedef struct Thread {
    pthread_t tid;
    int xid;
    TQueue *queue;
    Log *log;
    byte *bitv;
} Thread;

static inline byte
btvset(byte *bitv,long off)
{
    u32 msk;
    byte oval;

    bitv += BTVOFF(off);
    msk = BTVMSK(off);

    oval = *bitv & msk;

    *bitv |= msk;

    return oval;
}

AnyLock task_mutex;
AnyLock print_mutex;
double tvzero;
Cell *cellpool;                         // free pool of cells
long bitvlen;

#define BARRIER \
    __asm__ __volatile__("" ::: "memory")

// virtual function pointers
Cell *(*cellnew)(void);
void (*cellfree)(Cell *);
void (*lock_acquire)(AnyLock *lock);
void (*lock_release)(AnyLock *lock);

double
tvgetf(void)
{
    struct timespec ts;
    double sec;

    clock_gettime(CLOCK_REALTIME,&ts);
    sec = ts.tv_nsec;
    sec /= 1e9;
    sec += ts.tv_sec;

    sec -= tvzero;

    return sec;
}

void *
xalloc(size_t cnt,size_t siz)
{
    void *ptr;

    ptr = calloc(cnt,siz);
    if (ptr == NULL)
        sysfault("xalloc: calloc failure -- %s\n",strerror(errno));

    return ptr;
}

void
lock_wait_ticket(AnyLock *lock,u32 newval)
{
    u32 oldval;

    // wait for our ticket to come up
    // NOTE: atomic_load is [probably] overkill here
    while (1) {
#if 0
        oldval = atomic_load(&lock->seqacq);
#else
        oldval = lock->seqacq;
#endif
        if (oldval == newval)
            break;
    }
}

void
lock_acquire_ticket(AnyLock *lock)
{
    u32 oldval;
    u32 newval;
    int ok;

    // acquire our ticket value
    // NOTE: just use a garbage value for oldval -- the exchange will
    // update it with the correct/latest value -- this saves a separate
    // refetch within the loop
    oldval = 0;
    while (1) {
#if 0
        BARRIER;
        oldval = lock->seqreq;
#endif
        newval = oldval + 1;
        ok = atomic_compare_exchange_strong(&lock->seqreq,&oldval,newval);
        if (ok)
            break;
    }

    lock_wait_ticket(lock,newval);
}

void
lock_release_ticket(AnyLock *lock)
{

    // NOTE: atomic_fetch_add is [probably] overkill, but leave it for now
#if 1
    atomic_fetch_add(&lock->seqacq,1);
#else
    lock->seqacq += 1;
#endif
}

void
lock_acquire_mutex(AnyLock *lock)
{

    pthread_mutex_lock(&lock->mutex);
}

void
lock_release_mutex(AnyLock *lock)
{

    pthread_mutex_unlock(&lock->mutex);
}

void
lock_init(AnyLock *lock)
{

    switch (opt_L) {
    case 1:
        lock->seqreq = 0;
        lock->seqacq = 1;
        lock_acquire = lock_acquire_ticket;
        lock_release = lock_release_ticket;
        break;

    default:
        pthread_mutex_init(&lock->mutex,NULL);
        lock_acquire = lock_acquire_mutex;
        lock_release = lock_release_mutex;
        break;
    }
}

void
startQueue(TQueue *queue)
{

    queue->head = NULL;
    queue->tail = NULL;
}

int
empty(TQueue *queue)
{

    return (queue->head == NULL);
}

// cellnew_pool -- allocate a queue entry
Cell *
cellnew_pool(void)
{
    int cnt;
    Cell *p;
    Cell *pool;

    while (1) {
        // try for quick allocation
        p = cellpool;

        // bug out if we got it
        if (p != NULL) {
            cellpool = p->next;
            break;
        }

        // go to the heap to replenish the pool
        cnt = 1000;
        p = xalloc(cnt,sizeof(Cell));

        // link up the entries
        pool = NULL;
        for (;  cnt > 0;  --cnt, ++p) {
            p->next = pool;
            pool = p;
        }

        // put this "online"
        cellpool = pool;
    }

    return p;
}

// cellfree_pool -- release a queue entry
void
cellfree_pool(Cell *p)
{

    p->next = cellpool;
    cellpool = p;
}

// cellnew_std -- allocate a queue entry
Cell *
cellnew_std(void)
{
    Cell *p;

    p = xalloc(1,sizeof(Cell));

    return p;
}

// cellfree_std -- release a queue entry
void
cellfree_std(Cell *p)
{

    free(p);
}

void
enqueue(TQueue *queue, Task *t)
{
    Cell *p;

    lock_acquire(&task_mutex);

    p = cellnew();
    p->next = NULL;
    p->t = t;

    if (queue->tail == NULL) {
        queue->tail = p;
        queue->head = p;
    }
    else {
        queue->tail->next = p;
        queue->tail = p;
    }

    lock_release(&task_mutex);
}

Task *
dequeue(TQueue *queue)
{
    Task *t;

    lock_acquire(&task_mutex);

    if (empty(queue))
        t = NULL;

    else {
        Cell *p = queue->head;

        if (p == queue->tail)
            queue->tail = NULL;

        queue->head = p->next;

        t = p->t;

        cellfree(p);
    }

    lock_release(&task_mutex);

    return t;
}

void *
work(void *arg)
{
    Thread *tskcur = arg;
    TQueue *queue = tskcur->queue;
    Task *t;
    Log *log;
    long cnt;
    int tprev;
    byte *bitv;
    double tvbeg;
    double tvbef;
    double tvaft;

    log = tskcur->log;
    bitv = tskcur->bitv;
    tvbeg = tvgetf();

    tprev = 0;
    while (1) {
        tvbef = tvgetf();
        t = dequeue(queue);
        tvaft = tvgetf();

        if (t == NULL)
            break;

        // abort if we get a double entry
        if (btvset(bitv,t->number))
            sysfault("work: duplicate\n");

        if (opt_p) {
            printf("[%.9f/%.9f %5.5d] %d [%d]\n",
                tvbef,tvaft - tvbef,tskcur->xid,t->number,t->number - tprev);
            tprev = t->number;
            continue;
        }

        log->tvbef = tvbef;
        log->tvaft = tvaft;
        log->number = t->number;
        ++log;
    }

    if (! opt_p) {
        tvaft = tvgetf();

        cnt = log - tskcur->log;
        log = tskcur->log;

        lock_acquire(&print_mutex);

        printf("\n");
        printf("THREAD=%5.5d START=%.9f STOP=%.9f ELAP=%.9f TOTAL=%ld\n",
            tskcur->xid,tvbeg,tvaft,tvaft - tvbeg,cnt);

        tprev = 0;
        for (;  cnt > 0;  --cnt, ++log) {
            printf("[%.9f/%.9f %5.5d] %d [%d]\n",
                log->tvbef,log->tvaft - log->tvbef,tskcur->xid,
                log->number,log->number - tprev);
            tprev = log->number;
        }

        lock_release(&print_mutex);
    }

    return (void *) 0;
}

void
btvchk(Thread *tska,Thread *tskb)
{
    byte *btva;
    byte *btvb;
    byte aval;
    byte bval;
    int idx;

    printf("btvchk: %d ??? %d\n",tska->xid,tskb->xid);

    btva = tska->bitv;
    btvb = tskb->bitv;

    // abort if we get overlapping entries between two threads
    for (idx = 0;  idx < bitvlen;  ++idx) {
        aval = btva[idx];
        bval = btvb[idx];
        if (aval & bval)
            sysfault("btvchk: duplicate\n");
    }
}

// For a simple test i runned this on main:

int
main(int argc,char **argv)
{
    char *cp;
    TQueue *queue;
    Task *t;
    Thread *tsk;

    --argc;
    ++argv;

    for (;  argc > 0;  --argc, ++argv) {
        cp = *argv;
        if (*cp != '-')
            break;

        switch (cp[1]) {
        case 'p':  // print immediately
            opt_p = 1;
            break;

        case 'Q':  // number of queue items
            opt_Q = atoi(cp + 2);
            break;

        case 'T':  // number of threads
            opt_T = atoi(cp + 2);
            break;

        case 'L':
            opt_L = 1;
            break;

        case 'M':
            opt_M = 1;
            break;

        default:
            break;
        }
    }

    printf("p=%d -- thread log is %s\n",opt_p,opt_p ? "immediate" : "deferred");

    if (opt_T == 0)
        opt_T = 16;
    printf("T=%d (number of threads)\n",opt_T);

    if (opt_Q == 0)
        opt_Q = 1000000;
    printf("Q=%d (number of items to enqueue)\n",opt_Q);

    printf("L=%d -- lock is %s\n",opt_L,opt_L ? "ticket" : "mutex");
    printf("M=%d -- queue item allocation is %s\n",
        opt_M,opt_M ? "pooled" : "malloc/free");

    tvzero = tvgetf();

    lock_init(&task_mutex);
    lock_init(&print_mutex);

    // select queue item allocation strategy
    switch (opt_M) {
    case 1:
        cellnew = cellnew_pool;
        cellfree = cellfree_pool;
        break;

    default:
        cellnew = cellnew_std;
        cellfree = cellfree_std;
        break;
    }

    queue = xalloc(1,sizeof(TQueue));
    startQueue(queue);

    Thread threads[opt_T];

    // get byte length of bit vectors
    bitvlen = BTVLEN(opt_Q + 1);

    // allocate per-thread log buffers
    for (int i = 0; i < opt_T; i++) {
        tsk = &threads[i];
        if (! opt_p)
            tsk->log = xalloc(opt_Q,sizeof(Log));
        tsk->bitv = xalloc(bitvlen,sizeof(byte));
    }

    // allocate "work to do"
    t = xalloc(opt_Q,sizeof(Task));

    // add to master queue
    for (int i = 0; i < opt_Q; i++) {
        t[i].number = i + 1;
        enqueue(queue, &t[i]);
    }

    // fire up the threads
    for (int i = 0; i < opt_T; i++) {
        tsk = &threads[i];
        tsk->xid = i + 1;
        tsk->queue = queue;
        pthread_create(&tsk->tid, NULL, work, tsk);
    }

    // wait for threads to complete
    for (int i = 0; i < opt_T; i++) {
        tsk = &threads[i];
        pthread_join(tsk->tid, NULL);
    }

    // wait for threads to complete
    for (int i = 0; i < opt_T; i++) {
        for (int j = i + 1; j < opt_T; j++)
            btvchk(&threads[i],&threads[j]);
    }

    printf("TOTAL: %.9f\n",tvgetf());

    free(t);

    return 0;
}
Craig Estey
  • 30,627
  • 4
  • 24
  • 48
  • Thanks a lot for your help! Just another question: Are the threads executing the tasks concurrently ? I've been testing the cpu usage with htop and i can only max the usage of a single core out of four. – Pastel Assado Jan 29 '17 at 20:54
  • Very nice! Thanks again. As you said : "Also, one thread (i.e. the first one) could monopolize the queue and drain all entries before the others have a chance to run." What can be done in that case ? – Pastel Assado Jan 29 '17 at 23:04
  • That case is [technically] called "thread starvation". The simple fix is to add a small `nanosleep` call to the bottom of the work thread's loop. If you understand how I did program options, try adding a `-S` option and the call. In the general case solution, there is an _elaborate_ algorithm that _guarantees_ fairness. There is another simpler one that I "reinvented" that is based on a "ticket" lock, that may be "good enough". These replace the mutex with `atomic_compare_exchange*` from `stdatomic.h`. I'll dig up some url's on this and update my answer later – Craig Estey Jan 29 '17 at 23:24