I'm looking for a recommended implementation of a thread-safe blocking queue (multi producer/consumer) in C using pthread synchronization semantics.
Asked
Active
Viewed 2.5k times
2 Answers
30
Here's one I use
threadqueue.h
#ifndef _THREADQUEUE_H_
#define _THREADQUEUE_H_ 1
#include <pthread.h>
#ifdef __cplusplus
extern "C" {
#endif
/**
* @defgroup ThreadQueue ThreadQueue
*
* Little API for waitable queues, typically used for passing messages
* between threads.
*
*/
/**
* @mainpage
*/
/**
* A thread message.
*
* @ingroup ThreadQueue
*
* This is used for passing to #thread_queue_get for retreive messages.
* the date is stored in the data member, the message type in the #msgtype.
*
* Typical:
* @code
* struct threadmsg;
* struct myfoo *foo;
* while(1)
* ret = thread_queue_get(&queue,NULL,&message);
* ..
* foo = msg.data;
* switch(msg.msgtype){
* ...
* }
* }
* @endcode
*
*/
struct threadmsg{
/**
* Holds the data.
*/
void *data;
/**
* Holds the messagetype
*/
long msgtype;
/**
* Holds the current queue lenght. Might not be meaningful if there's several readers
*/
long qlength;
};
/**
* A TthreadQueue
*
* @ingroup ThreadQueue
*
* You should threat this struct as opaque, never ever set/get any
* of the variables. You have been warned.
*/
struct threadqueue {
/**
* Length of the queue, never set this, never read this.
* Use #threadqueue_length to read it.
*/
long length;
/**
* Mutex for the queue, never touch.
*/
pthread_mutex_t mutex;
/**
* Condition variable for the queue, never touch.
*/
pthread_cond_t cond;
/**
* Internal pointers for the queue, never touch.
*/
struct msglist *first,*last;
/**
* Internal cache of msglists
*/
struct msglist *msgpool;
/**
* No. of elements in the msgpool
*/
long msgpool_length;
};
/**
* Initializes a queue.
*
* @ingroup ThreadQueue
*
* thread_queue_init initializes a new threadqueue. A new queue must always
* be initialized before it is used.
*
* @param queue Pointer to the queue that should be initialized
* @return 0 on success see pthread_mutex_init
*/
int thread_queue_init(struct threadqueue *queue);
/**
* Adds a message to a queue
*
* @ingroup ThreadQueue
*
* thread_queue_add adds a "message" to the specified queue, a message
* is just a pointer to a anything of the users choice. Nothing is copied
* so the user must keep track on (de)allocation of the data.
* A message type is also specified, it is not used for anything else than
* given back when a message is retreived from the queue.
*
* @param queue Pointer to the queue on where the message should be added.
* @param data the "message".
* @param msgtype a long specifying the message type, choice of the user.
* @return 0 on succes ENOMEM if out of memory EINVAL if queue is NULL
*/
int thread_queue_add(struct threadqueue *queue, void *data, long msgtype);
/**
* Gets a message from a queue
*
* @ingroup ThreadQueue
*
* thread_queue_get gets a message from the specified queue, it will block
* the caling thread untill a message arrives, or the (optional) timeout occurs.
* If timeout is NULL, there will be no timeout, and thread_queue_get will wait
* untill a message arrives.
*
* struct timespec is defined as:
* @code
* struct timespec {
* long tv_sec; // seconds
* long tv_nsec; // nanoseconds
* };
* @endcode
*
* @param queue Pointer to the queue to wait on for a message.
* @param timeout timeout on how long to wait on a message
* @param msg pointer that is filled in with mesagetype and data
*
* @return 0 on success EINVAL if queue is NULL ETIMEDOUT if timeout occurs
*/
int thread_queue_get(struct threadqueue *queue, const struct timespec *timeout, struct threadmsg *msg);
/**
* Gets the length of a queue
*
* @ingroup ThreadQueue
*
* threadqueue_length returns the number of messages waiting in the queue
*
* @param queue Pointer to the queue for which to get the length
* @return the length(number of pending messages) in the queue
*/
long thread_queue_length( struct threadqueue *queue );
/**
* @ingroup ThreadQueue
* Cleans up the queue.
*
* threadqueue_cleanup cleans up and destroys the queue.
* This will remove all messages from a queue, and reset it. If
* freedata is != 0 free(3) will be called on all pending messages in the queue
* You cannot call this if there are someone currently adding or getting messages
* from the queue.
* After a queue have been cleaned, it cannot be used again untill #thread_queue_init
* has been called on the queue.
*
* @param queue Pointer to the queue that should be cleaned
* @param freedata set to nonzero if free(3) should be called on remaining
* messages
* @return 0 on success EINVAL if queue is NULL EBUSY if someone is holding any locks on the queue
*/
int thread_queue_cleanup(struct threadqueue *queue, int freedata);
#ifdef __cplusplus
}
#endif
#endif
threadqueue.c
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/time.h>
#include "../h/threadqueue.h"
#define MSGPOOL_SIZE 256
struct msglist {
struct threadmsg msg;
struct msglist *next;
};
static inline struct msglist *get_msglist(struct threadqueue *queue)
{
struct msglist *tmp;
if(queue->msgpool != NULL) {
tmp = queue->msgpool;
queue->msgpool = tmp->next;
queue->msgpool_length--;
} else {
tmp = malloc(sizeof *tmp);
}
return tmp;
}
static inline void release_msglist(struct threadqueue *queue,struct msglist *node)
{
if(queue->msgpool_length > ( queue->length/8 + MSGPOOL_SIZE)) {
free(node);
} else {
node->msg.data = NULL;
node->msg.msgtype = 0;
node->next = queue->msgpool;
queue->msgpool = node;
queue->msgpool_length++;
}
if(queue->msgpool_length > (queue->length/4 + MSGPOOL_SIZE*10)) {
struct msglist *tmp = queue->msgpool;
queue->msgpool = tmp->next;
free(tmp);
queue->msgpool_length--;
}
}
int thread_queue_init(struct threadqueue *queue)
{
int ret = 0;
if (queue == NULL) {
return EINVAL;
}
memset(queue, 0, sizeof(struct threadqueue));
ret = pthread_cond_init(&queue->cond, NULL);
if (ret != 0) {
return ret;
}
ret = pthread_mutex_init(&queue->mutex, NULL);
if (ret != 0) {
pthread_cond_destroy(&queue->cond);
return ret;
}
return 0;
}
int thread_queue_add(struct threadqueue *queue, void *data, long msgtype)
{
struct msglist *newmsg;
pthread_mutex_lock(&queue->mutex);
newmsg = get_msglist(queue);
if (newmsg == NULL) {
pthread_mutex_unlock(&queue->mutex);
return ENOMEM;
}
newmsg->msg.data = data;
newmsg->msg.msgtype = msgtype;
newmsg->next = NULL;
if (queue->last == NULL) {
queue->last = newmsg;
queue->first = newmsg;
} else {
queue->last->next = newmsg;
queue->last = newmsg;
}
if(queue->length == 0)
pthread_cond_broadcast(&queue->cond);
queue->length++;
pthread_mutex_unlock(&queue->mutex);
return 0;
}
int thread_queue_get(struct threadqueue *queue, const struct timespec *timeout, struct threadmsg *msg)
{
struct msglist *firstrec;
int ret = 0;
struct timespec abstimeout;
if (queue == NULL || msg == NULL) {
return EINVAL;
}
if (timeout) {
struct timeval now;
gettimeofday(&now, NULL);
abstimeout.tv_sec = now.tv_sec + timeout->tv_sec;
abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec;
if (abstimeout.tv_nsec >= 1000000000) {
abstimeout.tv_sec++;
abstimeout.tv_nsec -= 1000000000;
}
}
pthread_mutex_lock(&queue->mutex);
/* Will wait until awakened by a signal or broadcast */
while (queue->first == NULL && ret != ETIMEDOUT) { //Need to loop to handle spurious wakeups
if (timeout) {
ret = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout);
} else {
pthread_cond_wait(&queue->cond, &queue->mutex);
}
}
if (ret == ETIMEDOUT) {
pthread_mutex_unlock(&queue->mutex);
return ret;
}
firstrec = queue->first;
queue->first = queue->first->next;
queue->length--;
if (queue->first == NULL) {
queue->last = NULL; // we know this since we hold the lock
queue->length = 0;
}
msg->data = firstrec->msg.data;
msg->msgtype = firstrec->msg.msgtype;
msg->qlength = queue->length;
release_msglist(queue,firstrec);
pthread_mutex_unlock(&queue->mutex);
return 0;
}
//maybe caller should supply a callback for cleaning the elements ?
int thread_queue_cleanup(struct threadqueue *queue, int freedata)
{
struct msglist *rec;
struct msglist *next;
struct msglist *recs[2];
int ret,i;
if (queue == NULL) {
return EINVAL;
}
pthread_mutex_lock(&queue->mutex);
recs[0] = queue->first;
recs[1] = queue->msgpool;
for(i = 0; i < 2 ; i++) {
rec = recs[i];
while (rec) {
next = rec->next;
if (freedata) {
free(rec->msg.data);
}
free(rec);
rec = next;
}
}
pthread_mutex_unlock(&queue->mutex);
ret = pthread_mutex_destroy(&queue->mutex);
pthread_cond_destroy(&queue->cond);
return ret;
}
long thread_queue_length(struct threadqueue *queue)
{
long counter;
// get the length properly
pthread_mutex_lock(&queue->mutex);
counter = queue->length;
pthread_mutex_unlock(&queue->mutex);
return counter;
}

nos
- 223,662
- 58
- 417
- 506
-
Are there many situations in which `thread_queue_length()` can be used in a non-racy way? – caf Jan 02 '11 at 22:58
-
No. It depends on you needs, thread_queue_length gives you the queued length right now, regardless of how many new messages were added or processed in the last nanosecond. I use it simply for periodic logging of the queue size. – nos Jan 03 '11 at 09:43
-
`add` and `get` both use the same mutex. if `get` acquires the mutex and is waiting on the `broadcast` from `add` ... won't that result in deadlock since add couldn't lock the mutex? – Mike Jan 03 '11 at 21:36
-
2@Mike: No. The condition variable wait functions atomically unlock the mutex and wait; they then reaquire the mutex before returning. – caf Jan 04 '11 at 04:43
-
4When posting software, please post it with an explicit license so we know how it can be used. – Daniel Dec 07 '13 at 07:52
-
What is the reason for signalling with `pthread_cond_broadcast()` in `thread_queue_add()`? – class stacker Nov 25 '15 at 12:40
-
@ClassStacker So any consumer can wake up and fetch an element. – nos Nov 25 '15 at 13:03
-
Let me clarify. Why did you prefer your approach with `pthread_cond_broadcast()` over one with `pthread_cond_signal()`? – class stacker Nov 25 '15 at 13:35
-
@ClassStacker Mostly because this was a work in progress, and next version I did of this queue performed it's own tracking of the number of consumers and could therefore dispatch multiple consumers if the queue held several messages. But you're right, in this particular case pthread_cond_signal would be sufficient. – nos Nov 25 '15 at 13:48
-
But not efficient without another code change. ;) -- I was asking because I was interested in potential efficiency aspects of the possible solutions. Thanks! – class stacker Nov 25 '15 at 13:50
-
In most if not all implementations the case of doing a _broadcast when there's only a single thread to wake op is the same as doing _signal though. – nos Nov 25 '15 at 13:56
-
This and many others are here, as C++ templates: https://github.com/WigWagCo/twlib – EdH Mar 01 '17 at 23:07
7
Try APR queues. It's used by the apache web server and pretty well tested.

Missaka Wijekoon
- 883
- 7
- 10
-
4Great suggestion to use apr_queue, however, it's not so simple to integrate into your software if you don't want all of apr-util. Here's a version of apr_queue that just requires including one header and one C file: https://github.com/chrismerck/rpa_queue – Chris Merck Apr 26 '19 at 15:51
-
1Thanks for sharing the @ChrisMerck . Will take a look at this when I need a solution in a memory constrained environment. – Missaka Wijekoon Apr 27 '19 at 22:11
-
And thank you, Missaka, for the suggestion to look at APR Queues! It was not very difficult to strip out the apr-util stuff and get a well-tested but simple implementation. – Chris Merck Apr 28 '19 at 23:36