You don't need a separate "sleep" mode.
Telling a thread to "stop" by doing signal
is clunky. Better to send a message with a "stop" flag in it. This allows for graceful shutdown (without the race condition issues with the signal).
To prevent threads from going into "spin" loops that burn CPU, we want a thread to sleep and wake up at just the right time for its next work request.
While we can use condition variables (e.g. pthread_cond_wait/pthread_cond_signal
), there are performance issues.
Sending a pthread_cond_signal
will only wake up one thread. And, it may be the "wrong" thread (or always the same one). So, we really want to do a pthread_cond_broadcast
and wake up all sleeping threads.
But, if the broadcast is sent just before a thread goes to sleep, it will not signal it. And, another thread may have already "cleared" the condition variable. So, the second thread will have "missed" the wakeup.
And, with pthread_cond_broadcast
, it wakes up all sleeping threads, even though only one may "get" the request block [on a queue]. The others will go back to sleep, having been woken up needlessly.
In other words, the woken up threads will race to grab the next element from the queue that the condition variable is used for.
This may be okay if each worker thread has its own private [per-thread] work request queue.
But, having a separate queue for each thread can result in a request waiting for a long time if the queue its waiting on is for a thread that is currently working on a request that is taking an extended amount of time to complete.
So, the pending request just sits there, on the queue, even if other worker threads are idle.
One way to solve this is to allow "work stealing" by other threads. That is, if their own queue is empty, they go looking at the request queues of other workers. They, then, "steal" the work by dequeuing it from another thread's queue.
That is a known, valid way to do it. But, a much simpler way is to have all threads use a single work request queue. It has less overhead than the work stealing approach and can be more robust.
What I've found, in practice (on a commercial grade shipping product), is to use IPC message queues [instead of condition variables].
I use these in conjunction with mutex protected doubly linked lists of "work request" structs.
They work cleanly with multiple producers and multiple consumers.
Although things have changed a lot, when I first started using IPC messages, circa 2004, msgrcv
was the only mechanism that guaranteed that the system would wake up the thread and do an immediate reschedule so that the receiver would be run ASAP.
Here is a simplified [and working] example of what I mean:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdint.h>
#include <stdatomic.h>
#include <pthread.h>
#include <time.h>
#include <stdarg.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define sysfault(_fmt...) \
do { \
_dbgprt(_fmt); \
_sysfault(); \
} while (0)
#if DEBUG
#define dbgprt(_fmt...) \
_dbgprt(_fmt)
#else
#define dbgprt(_fmt...) \
do { } while (0)
#endif
#define inline_always static inline __attribute__((__always_inline__))
#define inline_never __attribute__((__noinline__))
enum worktype {
WORK_SUM,
WORK_AVG,
WORK_FACT,
WORK_STOP,
};
#define WORKTYPEMAX (WORK_STOP + 1)
typedef unsigned int u32;
typedef long long s64;
typedef unsigned long long u64;
// work descriptor
typedef struct work {
struct work *work_prev; // pointer to previous entry
struct work *work_next; // pointer to next entry
int work_bufno; // buffer number
enum worktype work_type; // type of work
void *work_data; // pointer to work data
u32 work_count; // count of work data
u32 work_xid; // which task performed work
u64 work_result; // work result
} work_t;
// work queue
typedef struct queue {
struct queue *que_next; // pointer to next queue
const char *que_sym; // queue name
work_t *que_head; // head of work list
work_t *que_tail; // tail of work list
int que_count; // number of elements in queue
pthread_mutex_t que_mutex; // lock
long que_msqid; // msgsnd/msgrcv msqid
int que_stop; // 1=stop
} queue_t;
queue_t *queue_list; // list of queues
queue_t queue_free; // free queue
queue_t queue_active; // active queue
queue_t queue_result; // result queue
int quefd; // message queue identifier
int questop; // 1=queues requested to stop
// task control
typedef struct {
pthread_t tsk_tid; // task thread id
u32 tsk_xid; // task id
void *(*tsk_fnc)(void *); // task function
FILE *tsk_xfdbg; // debug
u32 tsk_rand; // random number seed
int tsk_stop; // 1=task got stop request
int tsk_done; // 1=task done
} tsk_t;
tsk_t *tsklist; // list of tasks
tsk_t *tskm; // master task
__thread tsk_t *tskcur; // current task
#define TSKFORALL(_tsk) \
tsk_t *_tsk = &tsklist[0]; _tsk < &tsklist[opt_T]; ++_tsk
// msgsnd/msgrcv message
typedef struct {
long xmsg_msqid; // msgsnd/msgrcv msqid
u32 xmsg_xid; // message sender task id
work_t *xmsg_work; // pointer to work
int xmsg_stop; // 1=stop queue request
} xmsg_t;
const char *opt_L; // output trace directory
int opt_M; // number of worker message structs
int opt_N; // number of messages to send
int opt_T; // number of workers
void *worker_thread(void *vptr);
double tsczero;
double
tscgetf(void)
{
struct timespec ts;
double sec;
clock_gettime(CLOCK_MONOTONIC,&ts);
sec = ts.tv_nsec;
sec /= 1e9;
sec += ts.tv_sec;
sec -= tsczero;
return sec;
}
// _sysfault -- terminate program abnormally
inline_never void
_sysfault(void)
{
exit(1);
}
inline_never void
_dbgprt(const char *fmt,...)
{
va_list ap;
char buf[1000];
char *bp = buf;
double tsc = tscgetf();
bp += sprintf(bp,"[%.9f] t%4.4u ",tsc,tskcur->tsk_xid);
va_start(ap,fmt);
bp += vsprintf(bp,fmt,ap);
va_end(ap);
FILE *xferr = NULL;
if (tskcur != NULL)
xferr = tskcur->tsk_xfdbg;
if (xferr == NULL)
xferr = stderr;
// send line atomically
fputs(buf,xferr);
}
inline_always int
workbufno(work_t *work)
{
int bufno;
if (work != NULL)
bufno = work->work_bufno;
else
bufno = -1;
return bufno;
}
#define QDS(_sym,_reason) \
_dbgprt("xmsgstat: " #_sym " %lld -- " _reason "\n", \
(s64) ds._sym)
#define QDS_TOD(_sym,_reason) \
_dbgprt("xmsgstat: " #_sym " %.9f -- " _reason "\n", \
(double) ds._sym - tsczero)
#define QINFO(_sym,_reason) \
_dbgprt("xmsgstat: " #_sym " %lld -- " _reason "\n", \
(s64) info._sym)
// xmsgstat -- show state
void
xmsgstat(int full,const char *who)
{
_dbgprt("xmsgstat: ENTER (from %s)\n",who);
struct msqid_ds ds;
msgctl(quefd,IPC_STAT,&ds);
QDS_TOD(msg_stime,"time of last msgsnd");
QDS_TOD(msg_rtime,"time of last msgrcv");
QDS_TOD(msg_ctime,"time of last change");
QDS(__msg_cbytes,"current number of bytes in queue");
QDS(msg_qnum,"current number of messages in queue");
QDS(msg_qbytes,"maximum number of bytes allowed in queue");
if (full) {
#if 1
size_t maxmsg = ds.msg_qbytes / sizeof(xmsg_t);
if (opt_M > maxmsg)
opt_M = maxmsg;
#endif
struct msginfo info;
msgctl(quefd,MSG_INFO,(void *) &info);
QINFO(msgpool,"size in KB of buffer pool used to hold message data");
QINFO(msgmap,"max number of entries in message map (unused)");
QINFO(msgmax,"max number of bytes in a single message");
QINFO(msgmnb,"max number of bytes that can be written to queue");
QINFO(msgmni,"max number of message queues");
QINFO(msgssz,"message segment size (unused)");
QINFO(msgtql,"max number of messages on all queues in system (unused)");
QINFO(msgseg,"max number of segments (unused)");
}
_dbgprt("xmsgstat: EXIT\n");
}
// queput -- send work
void
queput(queue_t *que,work_t *work)
{
xmsg_t xmsg = { 0 };
dbgprt("queput: ENTER que_sym=%s work_bufno=%u\n",
que->que_sym,workbufno(work));
// identify who sent the message
xmsg.xmsg_xid = tskcur->tsk_xid;
work->work_xid = tskcur->tsk_xid;
pthread_mutex_lock(&que->que_mutex);
work_t *tail = que->que_tail;
work_t *head = que->que_head;
dbgprt("queput: QPTRS que_head=%d que_tail=%d que_count=%d que_msqid=%ld quefd=%d\n",
workbufno(head),workbufno(tail),que->que_count,que->que_msqid,quefd);
work->work_prev = tail;
if (tail != NULL)
tail->work_next = work;
que->que_tail = work;
work->work_next = NULL;
if (head == NULL)
que->que_head = work;
que->que_count += 1;
pthread_mutex_unlock(&que->que_mutex);
do {
xmsg.xmsg_msqid = atomic_load(&que->que_msqid);
if (xmsg.xmsg_msqid <= 0)
break;
// send message
dbgprt("queput: MSGSND\n");
int err = msgsnd(quefd,&xmsg,sizeof(xmsg_t) - sizeof(long),0);
if (err < 0)
sysfault("queput: msgsnd fault -- %s\n",strerror(errno));
} while (0);
dbgprt("queput: EXIT\n");
}
// queget -- dequeue next work
work_t *
queget(queue_t *que,int flags)
{
int err;
xmsg_t xmsg;
work_t *work;
dbgprt("queget: ENTER que_sym='%s' que_msqid=%ld flags=%8.8X\n",
que->que_sym,que->que_msqid,flags);
// wait for message
do {
int msqid = atomic_load(&que->que_msqid);
if (msqid <= 0)
break;
// already received stop message
if (tskcur->tsk_stop) {
dbgprt("queget: TSKSTOP/ALREADY\n");
break;
}
#if 1
xmsg.xmsg_msqid = msqid;
#endif
dbgprt("queget: MSGRCV\n");
err = msgrcv(quefd,&xmsg,sizeof(xmsg_t) - sizeof(long),msqid,flags);
if (err < 0) {
if (errno == ENOMSG)
break;
sysfault("queget: msgrcv fault -- %s\n",strerror(errno));
}
// process stop message
if (xmsg.xmsg_stop) {
dbgprt("queget: TSKSTOP/NOW\n");
tskcur->tsk_stop = 1;
}
} while (0);
pthread_mutex_lock(&que->que_mutex);
do {
work = que->que_head;
// empty queue
if (work == NULL)
break;
work_t *next = work->work_next;
que->que_head = next;
if (next != NULL)
next->work_prev = NULL;
if (que->que_tail == work)
que->que_tail = NULL;
que->que_count -= 1;
} while (0);
pthread_mutex_unlock(&que->que_mutex);
dbgprt("queget: EXIT work=%d\n",workbufno(work));
return work;
}
void
queinit(queue_t *que,const char *sym,long msqid)
{
dbgprt("queinit: ENTER sym='%s' msqid=%ld\n",sym,msqid);
pthread_mutex_init(&que->que_mutex,NULL);
que->que_sym = sym;
que->que_msqid = msqid;
que->que_next = queue_list;
queue_list = que;
dbgprt("queinit: EXIT\n");
}
// queinitall -- initialize message queues
void
queinitall(void)
{
dbgprt("queinitall: ENTER\n");
quefd = msgget(IPC_PRIVATE,IPC_CREAT | 0600);
if (quefd < 0)
sysfault("queinit: msgget fault -- %s\n",strerror(errno));
xmsgstat(1,"queinit");
queinit(&queue_result,"queue_result",2);
queinit(&queue_active,"queue_active",1);
queinit(&queue_free,"queue_free",0);
work_t *work;
for (int idx = 0; idx < opt_M; ++idx) {
work = calloc(1,sizeof(*work));
work->work_bufno = idx;
queput(&queue_free,work);
}
dbgprt("queinitall: EXIT\n");
}
// questopall -- tell all receivers/workers to stop cleanly
void
questopall(queue_t *que)
{
int err;
xmsg_t xmsg;
dbgprt("questopall: ENTER que_sym=%s\n",que->que_sym);
atomic_store(&questop,1);
for (TSKFORALL(tsk)) {
memset(&xmsg,0,sizeof(xmsg));
xmsg.xmsg_stop = 1;
xmsg.xmsg_msqid = que->que_msqid;
// send message
err = msgsnd(quefd,&xmsg,sizeof(xmsg_t) - sizeof(long),0);
if (err < 0)
sysfault("queput: msgsnd fault -- %s\n",strerror(errno));
} while (0);
dbgprt("questopall: EXIT\n");
}
// tskstart -- start task
void *
tskstart(void *ptr)
{
tsk_t *tsk = ptr;
tskcur = tsk;
ptr = tsk->tsk_fnc(ptr);
if (tsk->tsk_xfdbg != NULL) {
fclose(tsk->tsk_xfdbg);
tsk->tsk_xfdbg = NULL;
}
// say we're done
tskcur->tsk_done = 1;
return ptr;
}
// tskinit -- initialize
void
tskinit(tsk_t *tsk,u32 xid)
{
char file[1000];
tsk->tsk_xid = xid;
tsk->tsk_rand = rand();
if (opt_L != NULL) {
sprintf(file,"%s/tsklog_%4.4d.txt",opt_L,tsk->tsk_xid);
tsk->tsk_xfdbg = fopen(file,"w");
if (tsk->tsk_xfdbg != NULL)
setlinebuf(tsk->tsk_xfdbg);
}
}
// tskinitall -- initialize all tasks
void
tskinitall(void)
{
tsklist = calloc(opt_T + 1,sizeof(tsk_t));
u32 xid = 0;
tskm = calloc(1,sizeof(tsk_t));
tskcur = tskm;
tskinit(tskm,xid++);
for (TSKFORALL(tsk), ++xid)
tskinit(tsk,xid);
}
// tsklaunch -- create new task
void
tsklaunch(tsk_t *tsk,void *(*fnc)(void *))
{
dbgprt("tsklaunch: ENTER tsk_xid=%u\n",tsk->tsk_xid);
if (tsk->tsk_fnc == NULL)
tsk->tsk_fnc = fnc;
pthread_create(&tsk->tsk_tid,NULL,tskstart,tsk);
dbgprt("tsklaunch: EXIT tsk_xid=%u\n",tsk->tsk_xid);
}
// tsklaunchall -- create/launch all tasks
void
tsklaunchall(void *(*fnc)(void *))
{
for (TSKFORALL(tsk))
tsklaunch(tsk,fnc);
}
// tskjoinall -- initialize
void
tskjoinall(void)
{
dbgprt("tskjoinall: ENTER\n");
for (TSKFORALL(tsk)) {
dbgprt("tskjoinall: LOOP tsk_xid=%u\n",tsk->tsk_xid);
pthread_join(tsk->tsk_tid,NULL);
if (tsk->tsk_xfdbg != NULL)
fclose(tsk->tsk_xfdbg);
}
dbgprt("tskjoinall: EXIT\n");
}
// worker_thread -- sample worker thread function
void *
worker_thread(void *vptr)
{
while (1) {
// get new work to do
work_t *work = queget(&queue_active,0);
// process work
if (work != NULL) {
switch (work->work_type) {
case WORK_SUM:
case WORK_AVG:
case WORK_FACT:
work->work_result = rand_r(&tskcur->tsk_rand);
break;
case WORK_STOP:
break;
}
// send back results
queput(&queue_result,work);
}
// handle stop requeust
if (tskcur->tsk_stop) {
dbgprt("worker_thread: TSKSTOP/FINAL\n");
break;
}
}
return (void *) 0;
}
// workresult -- get results from workers
void
workresult(int flags)
{
dbgprt("workresult: ENTER flags=%8.8X\n",flags);
while (queue_free.que_count < opt_M) {
dbgprt("workresult: LOOP queue_active=%d queue_result=%d queue_free=%d\n",
queue_active.que_count,queue_result.que_count,queue_free.que_count);
work_t *work = queget(&queue_result,flags);
if (work == NULL)
break;
_dbgprt("master: RESULT work_type=%u work_result=%llu work_xid=%u\n",
work->work_type,work->work_result,work->work_xid);
// put request block on free queue
queput(&queue_free,work);
}
dbgprt("workresult: EXIT\n");
}
int
main(int argc,char **argv)
{
--argc;
++argv;
for (; argc > 0; --argc, ++argv) {
char *cp = *argv;
if (*cp != '-')
break;
cp += 2;
switch (cp[-1]) {
case 'L':
opt_L = (*cp != 0) ? cp : ".";
break;
case 'M':
opt_M = (*cp != 0) ? atoi(cp) : 1;
break;
case 'N':
opt_N = (*cp != 0) ? atoi(cp) : 1;
break;
case 'T':
opt_T = (*cp != 0) ? atoi(cp) : 1;
break;
}
}
setlinebuf(stdout);
setlinebuf(stderr);
tsczero = tscgetf();
if (opt_T < 1)
opt_T = 1;
if (opt_M < opt_T)
opt_M = opt_T;
if (opt_N < 0)
opt_N = 0;
printf("T=%d M=%d N=%d\n",opt_T,opt_M,opt_N);
if (opt_L != NULL) {
char cmd[1000];
sprintf(cmd,"mkdir -p %s",opt_L);
system(cmd);
}
// initialize all task blocks
tskinitall();
// initialize all queues
queinitall();
// start all threads
tsklaunchall(worker_thread);
for (int workidx = 0; workidx < opt_N; ++workidx) {
dbgprt("main: LOOP workidx=%d\n",workidx);
work_t *work;
while (1) {
workresult(IPC_NOWAIT);
work = queget(&queue_free,0);
if (work != NULL)
break;
}
// create the work request
work->work_type = rand_r(&tskm->tsk_rand) % WORKTYPEMAX;
work->work_count = rand_r(&tskm->tsk_rand) % 10000;
// send it to the workers
queput(&queue_active,work);
}
// wait for all requests to be processed
dbgprt("master: FINAL\n");
while (queue_free.que_count < opt_M) {
dbgprt("master: FINLOOP que_count=%d opt_M=%d\n",
queue_free.que_count,opt_M);
workresult(0);
}
// send stop requests
questopall(&queue_active);
// join all tasks
tskjoinall();
return 0;
}