4

This is a simple program which has a function start() which waits for user to enter something(using infinite loop) and stores it in queue. start() runs in a separate thread. After user enters some value, the size of queue remains zero in main. How can the queue be synchronized?
code: source.cpp

#include <iostream>
#include "kl.h"

using namespace std;

int main()
{
    std::thread t1(start);
    while (1)
    {
        if (q.size() > 0)
        {
            std::cout << "never gets inside this if\n";
            std::string first = q.front();
            q.pop();
        }           
    }
    t1.join();
}

code: kl.h

#include <queue>
#include <iostream> 
#include <string>

void start();
static std::queue<std::string> q;

code: kl.cpp

#include "kl.h"
using namespace std;

void start()
{
    char i;
    string str;
    while (1)
    {
        for (i = 0; i <= 1000; i++)
        {
            //other stuff and str input
            q.push(str);
        }

    }
}
user6275035
  • 91
  • 2
  • 9
  • added full code – user6275035 Sep 23 '16 at 14:31
  • 1
    Because you use 'static' on the queue in the header file, you actually have 2 different queues, one in each cpp file. That's why the one in main is always empty. – tony Sep 23 '16 at 19:00
  • @tony It shows linker errors on removing static. – user6275035 Sep 23 '16 at 19:05
  • Yep, I suspect it does. Try extern. And then `std::queue q;` in main.cpp. The h file says "this thing exists, somewhere", the cpp file says "it is here". Better yet, make it static in main, _remove_ it from the header, and pass it by reference into the thread function. – tony Sep 23 '16 at 21:39
  • @tony Thanks! this one worked too – user6275035 Sep 24 '16 at 04:15
  • boost has a synchronized queue at https://www.boost.org/doc/libs/1_76_0/doc/html/thread/sds.html#thread.sds.synchronized_queues . there is also a lock-free queue in boost. – fuzzyTew Jun 30 '21 at 15:28

3 Answers3

2

Your code contains a race - by me it crashed; both threads are potentially modifying a shared queue. (Also, you're looping with char i for values up to 1000 - not a good idea, probably.)

You should protect your shared queue with a std::mutex, and use a std::condition_variable to notify that there is a reason to check the queue.

Specifically, you should consider the following (which is very common for your case of a producer consumer):

  1. Access the queue only when holding the mutex.

  2. Use the condition variable to notify that you've pushed something into it.

  3. Use the condition variable to specify a condition on when there's a point to continue processing.

Here is a rewrite of your code:

#include <iostream>
#include <queue>
#include <thread>
#include <condition_variable>
#include <mutex>

using namespace std;

std::queue<std::string> q;
std::mutex m;
std::condition_variable cv;

void start()
{
    string str;
    for (std::size_t i = 0; i <= 1000; i++) {
        //other stuff and str input
        std::cout << "here" << std::endl;
        std::unique_lock<std::mutex> lk(m);
        q.push(str);
        lk.unlock();
        cv.notify_one();
    }
}

int main()
{
    std::thread t1(start);
    for (std::size_t i = 0; i <= 1000; i++)
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return !q.empty();});
        std::string first = q.front();
        q.pop();    
    }
    t1.join();
}
Community
  • 1
  • 1
Ami Tavory
  • 74,578
  • 11
  • 141
  • 185
  • @ami-travory could you elaborate on what this line is doing and the sintax? `cv.wait(lk, []{return !q.empty();});` – Paul Oct 07 '21 at 04:08
0

My synced queue class example and its usage:

template<typename T>
class SyncQueue
{
    std::queue<T> m_Que;
    std::mutex m_Lock;
    std::condition_variable m_ConVar;

public:
    void enque(T item)
    {
        std::unique_lock<std::mutex> lock(m_Lock);
        m_Que.push(item);
        lock.unlock();
        m_ConVar.notify_all();
    }

    T deque()
    {
        std::unique_lock<std::mutex> lock(m_Lock);

        do
        {
            m_ConVar.wait(lock);

        } while(m_Que.size() == 0); // extra check from spontaneous notifications

        auto ret = m_Que.front();
        m_Que.pop();

        return ret;
    }
};

int main()
{
    using namespace std::chrono_literals;

    SyncQueue<int> sq;

    std::thread consumer([&sq]()
    {
        std::cout << "consumer" << std::endl;

        for(;;)
        {
            std::cout << sq.deque() << std::endl;
        }
    });

    std::thread provider([&sq]()
    {
        std::this_thread::sleep_for(1s);
        sq.enque(1);
        std::this_thread::sleep_for(3s);
        sq.enque(2);
        std::this_thread::sleep_for(5s);
        sq.enque(3);
    });

    consumer.join();

    return 0;
}
Mykola Khyliuk
  • 1,234
  • 1
  • 9
  • 16
0
/*  Here I have a code snippate with Separate class for 
    Producing and Consuming along with buffer class */


#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <deque>
#include <vector>
using namespace std;
mutex _mutex_1,_mutex_2;
condition_variable cv;

template <typename T>
class Queue
{
    deque<T> _buffer;
    const unsigned int max_size = 10;
public:
    Queue() = default;
    void push(const T& item)
    {
        while(1)
        {
            unique_lock<mutex> locker(_mutex_1);
            cv.wait(locker,[this](){ return _buffer.size() < max_size; });
            _buffer.push_back(item);
            locker.unlock();
            cv.notify_all();
            return;
        }
    }

    T pop()
    {
        while(1)
        {
            unique_lock<mutex> locker(_mutex_1);
            cv.wait(locker,[this](){ return _buffer.size() > 0; });
            int back = _buffer.back();
            _buffer.pop_back();
            locker.unlock();
            cv.notify_all();
            return back;
        }
    }
};



class Producer
{
    Queue<int>* _buffer;

public:
    Producer(Queue<int>* _buf)
    {
        this->_buffer = _buf;
    }
    void run()
    {
        while(1)
        {
            auto num = rand()%100;
            _buffer->push(num);
            _mutex_2.lock();
            cout<<"Produced:"<<num<<endl;
            this_thread::sleep_for(std::chrono::milliseconds(50));
            _mutex_2.unlock();
        }
    }
};


class Consumer
{
    Queue<int>* _buffer;

public:
    Consumer(Queue<int>* _buf)
    {
        this->_buffer = _buf;
    }
    void run()
    {
        while(1)
        {
            auto num = _buffer->pop();
            _mutex_2.lock();
            cout<<"Consumed:"<<num<<endl;
            this_thread::sleep_for(chrono::milliseconds(50));
            _mutex_2.unlock();
        }
    }
};



void client()
{

    Queue<int> b;
    Producer p(&b);
    Consumer c(&b);
    thread producer_thread(&Producer::run, &p);
    thread consumer_thread(&Consumer::run, &c);
    producer_thread.join();
    consumer_thread.join();
}


int main()
{
    client();

    return 0;
}