2

Is this the correct way to implement a thread-safe queue in C++? I have multiple threads constantly enqueuing and dequeuing items until a certain condition happens, during which I "stop" the queue. Thanks

#include <queue>
#include <pthread.h>

template <typename T>
class ThreadSafeQueue {

private:
    std::queue<T> _queue;
    pthread_mutex_t queueMutex;
    pthread_cond_t emptyCondVar;

public:
    ThreadSafeQueue();

    bool volatile Stopped;

    void Enqueue(T data);
    T Dequeue();
    void StopQueue();
    void DestroyQueue();
};

template <typename T>
ThreadSafeQueue<T>::ThreadSafeQueue() {
    pthread_mutex_init(&queueMutex, NULL);
    pthread_cond_init(&emptyCondVar, NULL);
    Stopped = false;
}

template <typename T>
void ThreadSafeQueue<T>::Enqueue(T data) {
    pthread_mutex_lock(&queueMutex);
    _queue.push(data);
    pthread_cond_signal(&emptyCondVar);
    pthread_mutex_unlock(&queueMutex);

}

template <typename T>
T ThreadSafeQueue<T>::Dequeue() {

    pthread_mutex_lock(&queueMutex);
    if (_queue.empty()) {
        pthread_cond_wait(&emptyCondVar, &queueMutex);
    }
    if (Stopped) {
        pthread_mutex_unlock(&queueMutex);
        return NULL;
    }

    T elem = _queue.front();
    _queue.pop();
    pthread_mutex_unlock(&queueMutex);
    return elem;
}

template <typename T>
void ThreadSafeQueue<T>::StopQueue() {  
    pthread_mutex_lock(&queueMutex);
    Stopped = true;
    pthread_cond_broadcast(&emptyCondVar);
    pthread_mutex_unlock(&queueMutex);
}

template <typename T>
void ThreadSafeQueue<T>::DestroyQueue() {   
    pthread_mutex_lock(&queueMutex);
    _queue = std::queue<T>();
    pthread_mutex_unlock(&queueMutex);
}
P.P
  • 117,907
  • 20
  • 175
  • 238
Minh Tri Pham
  • 861
  • 3
  • 9
  • 18
  • 4
    Better use the c++ standard functionality for [`std::mutex`](http://en.cppreference.com/w/cpp/thread/mutex) aso. – πάντα ῥεῖ Jun 30 '15 at 17:50
  • 3
    This question might also be appropriate for [Code Review](https://codereview.stackexchange.com/), especially if you're after a more detailed discussion of various aspects of your code. – 5gon12eder Jun 30 '15 at 17:55
  • 2
    And no, it isn't entirely correct regardless. You never recheck the empty predicate in `Dequeue` once signalled on your cvar. You assume because you received a wakeup it must be non-empty, which is not guaranteed, especially with [spurious wakeups](http://stackoverflow.com/questions/8594591/why-does-pthread-cond-wait-have-spurious-wakeups) – WhozCraig Jun 30 '15 at 18:33

1 Answers1

1

Your Dequeue needs to loop on the pthread_cond_wait():

while (_queue.empty() && !Stopped) {
    pthread_cond_wait(&emptyCondVar, &queueMutex);
}
caf
  • 233,326
  • 40
  • 323
  • 462