51

I'm in the process of porting some Java code over to C++, and one particular section makes use of a BlockingQueue to pass messages from many producers to a single consumer.

If you are not familiar with what a Java BlockingQueue is, it is just a queue that has a hard capacity, which exposes thread safe methods to put() and take() from the queue. put() blocks if the queue is full, and take() blocks if the queue is empty. Also, timeout-sensitive versions of these methods are supplied.

Timeouts are relevant to my use-case, so a recommendation that supplies those is ideal. If not, I can code up some myself.

I've googled around and quickly browsed the Boost libraries and I'm not finding anything like this. Maybe I'm blind here...but does anyone know of a good recommendation?

Thanks!

Ben
  • 7,692
  • 15
  • 49
  • 64
  • hand made class that has an array(maybe deque instead of array for easier pop_front push_back) and mutex? – NoSenseEtAl Oct 09 '12 at 17:21
  • is hard capacity really a requirement? – Karoly Horvath Oct 09 '12 at 18:28
  • 2
    In my case, yes. Its very possible that producers outpace consumers, and I have a need to either block threads on the producer side, or otherwise reject their input, lest I run out of memory! – Ben Oct 09 '12 at 19:49

4 Answers4

71

It isn't fixed size and it doesn't support timeouts but here is a simple implementation of a queue I had posted recently using C++ 2011 constructs:

#include <mutex>
#include <condition_variable>
#include <deque>

template <typename T>
class queue
{
private:
    std::mutex              d_mutex;
    std::condition_variable d_condition;
    std::deque<T>           d_queue;
public:
    void push(T const& value) {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            d_queue.push_front(value);
        }
        this->d_condition.notify_one();
    }
    T pop() {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
        T rc(std::move(this->d_queue.back()));
        this->d_queue.pop_back();
        return rc;
    }
};

It should be trivial to extend and use a timed wait for popping. The main reason I haven't done it is that I'm not happy with the interface choices I have thought of so far.

Dietmar Kühl
  • 150,225
  • 13
  • 225
  • 380
  • is the scoping in push neccessary ? I guess you are trying to unlock the mutex... but Im not sure of the prerequirements for notify_one. – NoSenseEtAl Oct 10 '12 at 08:20
  • 10
    The scope in `push()` isn't necessary but without it the condition variable is signaled while the lock is still held. Releasing the lock prior to signaling makes the lock readily available. – Dietmar Kühl Oct 10 '12 at 10:01
  • could someone extent this example for `timed wait for popping`? – Oleg Vazhnev Sep 27 '13 at 06:45
  • here I posted similar code for review, probably some comments may be helpful http://codereview.stackexchange.com/questions/49820/thread-safe-queue – Oleg Vazhnev Aug 28 '14 at 11:35
  • how many consumers and producers this implementations supports? – Oleg Vazhnev Aug 28 '14 at 12:25
  • 1
    @javapowered: given that there are locks being used it doesn't really care! There is probably a limit on threads which can wait on a lock but I can't imagine that this would relevant. – Dietmar Kühl Aug 28 '14 at 15:01
  • 1
    Is it a matter of style to use **d_** or **m_** as prefix for member variables or there are some semantics in this choice? – Isaac Jan 04 '16 at 11:57
  • 1
    @Isaac: It is a matter of style. In different organizations different indicators for member variables are used. In different organizations I have used nothing, a `m_`-prefix, a `d_`-prefix, a `_`-suffix, and an [ill-advised] `_`-prefix. Currently, I'm working in an organization where a `d_`-prefix is used. – Dietmar Kühl Jan 04 '16 at 18:09
  • Would it make sense to use `condition_variable::wait_for` instead of `condition_variable::wait` ? – nurettin Jan 13 '22 at 13:01
  • @nurettin: depends on your needs: if you want/need to be able to bail out after some time, eg., to shutdown the system, wait_for() together with handling of the timeout may be reasonable. Normally I’d enqueue function objects and I would shutdown via a corresponding function object being enqueued, ie., there wouldn’t be a need for dealing with timeouts. – Dietmar Kühl Jan 13 '22 at 13:13
  • a small question regarding the lock, lets say the pop method is blocked at wait(), it means the mutex is locked, so no thread will be able to push into the queue so we are deadlocked. am i missing something? – Nitzanu Sep 18 '22 at 09:27
  • 3
    @Nitzanu: yes, you are missing that the defining feature of a condition variable is that it releases the lock and puts the thread to sleep without loosing a signal. I particular the lock will be released by `wait()`. This is the reason why a `unique_lock` is used rather then a `lock_guard` as the latter doesn’t have an interface to release the lock without destruction. When `wait()` returns it will reacquire the lock. It is important to realize that the lock is released during the `wait()` as anything else the lock protects may also change. – Dietmar Kühl Sep 18 '22 at 18:02
5

Here's an example of a blocking queue with shutdown request feature:

template <typename T> class BlockingQueue {
  std::condition_variable _cvCanPop;
  std::mutex _sync;
  std::queue<T> _qu;
  bool _bShutdown = false;

public:
  void Push(const T& item)
  {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _qu.push(item);
    }
    _cvCanPop.notify_one();
  }

  void RequestShutdown() {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _bShutdown = true;
    }
    _cvCanPop.notify_all();
  }

  bool Pop(T &item) {
    std::unique_lock<std::mutex> lock(_sync);
    for (;;) {
      if (_qu.empty()) {
        if (_bShutdown) {
          return false;
        }
      }
      else {
        break;
      }
      _cvCanPop.wait(lock);
    }
    item = std::move(_qu.front());
    _qu.pop();
    return true;
  }
};
Serge Rogatch
  • 13,865
  • 7
  • 86
  • 158
1

U should write the class of semephore first

#ifndef SEMEPHORE_H
#define SEMEPHORE_H
#include <mutex>
#include <condition_variable>

class semephore {
public:
    semephore(int count = 0)
        : count(count),
          m(),
          cv()
    {

    }

    void await() {
        std::unique_lock<std::mutex> lk(m);
        --count;
        if (count < 0) {
            cv.wait(lk);
        }
    }

    void post() {
        std::unique_lock<std::mutex> lk(m);
        ++count;
        if (count <= 0) {
            cv.notify_all();
        }
    }
    
private:
    int count;
    std::mutex m;
    std::condition_variable cv;
};

#endif // SEMEPHORE_H

now the blocked_queue can use the semephore to deal with it

#ifndef BLOCKED_QUEUE_H
#define BLOCKED_QUEUE_H
#include <list>
#include "semephore.h"

template <typename T>
class blocked_queue {
public:
    blocked_queue(int count) 
        : s_products(),
          s_free_space(count),
          li()
    {

    }

    void put(const T &t) {
        s_free_space.await();
        li.push_back(t);
        s_products.post();
    }

    T take() {
        s_products.await();
        T res = li.front();
        li.pop_front();
        s_free_space.post();
        return res;
    }
private:
    semephore s_products;
    semephore s_free_space;
    std::list<T> li;
};

#endif // BLOCKED_QUEUE_H

0

OK I'm a bit late to the party but I think this is a better fit for the Java's BlockingQueue implementation. Here I too use one mutex and two conditions to look after not full and not empty. IMO a BlockingQueue makes more sense with limited capacity which I didn't see in the other answers. I include a simple test scenario too:

#include <iostream>
#include <algorithm>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>

template<typename T>
class blocking_queue {
private:
    size_t _capacity;
    std::queue<T> _queue;
    std::mutex _mutex;
    std::condition_variable _not_full;
    std::condition_variable _not_empty;

public:
    inline blocking_queue(size_t capacity) : _capacity(capacity) {
        // empty
    }

    inline size_t size() const {
        std::unique_lock<std::mutex> lock(_mutex);
        return _queue.size();
    }

    inline bool empty() const {
        std::unique_lock<std::mutex> lock(_mutex);
        return _queue.empty();
    }

    inline void push(const T& elem) {
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // wait while the queue is full
            while (_queue.size() >= _capacity) {
                _not_full.wait(lock);
            }
            std::cout << "pushing element " << elem << std::endl;
            _queue.push(elem);
        }
        _not_empty.notify_all();
    }

    inline void pop() {
        {
            std::unique_lock<std::mutex> lock(_mutex);

            // wait while the queue is empty
            while (_queue.size() == 0) {
                _not_empty.wait(lock);
            }
            std::cout << "popping element " << _queue.front() << std::endl;
            _queue.pop();
        }
        _not_full.notify_one();
    }

    inline const T& front() {
        std::unique_lock<std::mutex> lock(_mutex);

        // wait while the queue is empty
        while (_queue.size() == 0) {
            _not_empty.wait(lock);
        }
        return _queue.front();
    }
};

int main() {
    blocking_queue<int> queue(5);

    // create producers
    std::vector<std::thread> producers;
    for (int i = 0; i < 10; i++) {
        producers.push_back(std::thread([&queue, i]() {
            queue.push(i);
            // produces too fast
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }));
    }

    // create consumers
    std::vector<std::thread> consumers;
    for (int i = 0; i < 10; i++) {
        producers.push_back(std::thread([&queue, i]() {
            queue.pop();
            // consumes too slowly
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }));
    }

    std::for_each(producers.begin(), producers.end(), [](std::thread &thread) {
        thread.join();
    });

    std::for_each(consumers.begin(), consumers.end(), [](std::thread &thread) {
        thread.join();
    });

    return EXIT_SUCCESS;
}
SkyWalker
  • 13,729
  • 18
  • 91
  • 187
  • 1
    Even though the STL queue works like this as well (you have to use front() to access the first element and then pop() it afterwards), I don't think this kind of interface works well in a multithreaded scenario, since there's no way for you to pop the first element and get its value in one operation. If you do front() then pop(), you may remove a different element then the one you just obtained. Or am I missing something here? – Gui Meira Sep 26 '19 at 14:02
  • 1
    Very true, it could be easily extended to do a `pop2` or `frontAndPop` which will fuse these into one and then cover this use-case where atomic thread-safe access is necessary instead of a two step approach. – SkyWalker Sep 26 '19 at 14:43
  • for the ```size()``` and ```empty()``` methods, do you really need to obtain a lock ? it seems that you will deadlock in this case – Saher Ahwal Jan 10 '23 at 00:06