2

I wrote a code of multiply producers and consumers with condition variables. Even when I have only one producer and one consumer it doesn't work. Both producers and consumers should run in while(true). When I run the code it gets stuck around 50% of runs. I guess it gets in a deadlock caused by over wait. I don't succeed to debug where it is stucked and how to unlock the conds. By request, I must create the code with wait,signal and broadcast.

If the queue is full, producer is waiting. if the queue is empty, consumer is waiting.

void WaitableQueue::enqueue(size_t a_item)
{
    (m_cond.getMutex()).lock();

    while(m_itemsCounter==m_capacity && !m_isBeingDestroyed)
    {
        ++m_numberOfWaiting;
        m_cond.wait();
        --m_numberOfWaiting;
    }

    std::cout<<"enqueue "<<a_item<<"\n";

    m_queue.push(a_item);
    ++m_itemsCounter;
    ++m_numbOfProduced;
    if(m_isBeingDestroyed)
    {
        m_cond.broadcast(); 
    }

    (m_cond.getMutex()).unlock();
    m_cond.broadcast();
}

void WaitableQueue::dequeue()
{
    (m_cond.getMutex()).lock();

    while(m_itemsCounter==0 && !m_isBeingDestroyed)
    {
        ++m_numberOfWaiting;
        std::cout<<"Waiting\n";
        m_cond.wait();
        std::cout<<"Done waiting\n";
        --m_numberOfWaiting;
    }

    if (m_isBeingDestroyed)
    {
        (m_cond.getMutex()).unlock();
        m_cond.broadcast();
        return;
    }
    std::cout<<"dequeue "<<m_queue.front()<<"\n";
    m_queue.pop();
    --m_itemsCounter;
    ++m_numbOfConsumed;
    (m_cond.getMutex()).unlock();
    m_cond.broadcast();
}

void WaitableQueue::destroy()
{
    (m_cond.getMutex()).lock();
    m_isBeingDestroyed=true;
    (m_cond.getMutex()).unlock();
}



void Producer::run()
{
    for(size_t i=0;i<m_numOfItemsToProduce;++i)
    {
        usleep(m_delay);
        size_t item=produce();
        m_wq.enqueue(item);
    }
}


Producer::produce() const
{
    return rand()%m_numOfItemsToProduce;
}

void Consumer::run()
{
    m_numOfProducersMutex.lock();
    while(m_numOfProducers>0)
    {
        m_numOfProducersMutex.unlock();
        usleep(m_delay);
        m_wq.dequeue();
        m_numOfProducersMutex.lock();
    }
    m_numOfProducersMutex.unlock();
}


int main()
{
    size_t numProducers=1;
    size_t numConsumers=3;
    Mutex mutex;
    ConditionalVariable cond(mutex);

    WaitableQueue<size_t> wq(NUM_OF_ITEMS,cond);
    std::vector<Producer<size_t>*> producerArray;
    std::vector<Consumer<size_t>*> consumerArray;
    Mutex numOfProducersMutex;

    for(size_t i=0;i<numProducers;++i)
    {
        Producer<size_t>* tempP=new Producer<size_t>(wq,NUM_OF_ITEMS,DELAY);
        producerArray.push_back(tempP);
    }

    for(size_t i=0;i<numConsumers;++i)
    {
        Consumer<size_t>* tempC=new Consumer<size_t>(wq,numProducers,numOfProducersMutex,DELAY);
        consumerArray.push_back(tempC);
    }

    for(size_t i=0;i<numProducers;++i)
    {
        producerArray[i]->start();
    }

    for(size_t i=0;i<numConsumers;++i)
    {
        consumerArray[i]->start();
    }

    for(size_t i=0;i<numProducers;++i)
    {
        producerArray[i]->join();
        numOfProducersMutex.lock();
        --numProducers;
        numOfProducersMutex.unlock();
    }
    usleep(100);

    //tell the consumers stop waiting
    wq.destroy();
   for(size_t i=0;i<numConsumers;++i)
    {
        consumerArray[i]->join();
    }

   for(size_t i=0;i<numProducers;++i)
   {
        delete producerArray[i];
   }

    for(size_t i=0;i<numConsumers;++i)
   {
        delete consumerArray[i];
   }
}

It works around 50% of runnings. In other 50% it gets stucked.

Tomer Barak
  • 49
  • 1
  • 4
  • Recommendation (that probably won't fix your current problem (but might fix a future one)): Don't lock and unlock the mutex yourself. Ensure that the mutex is unlocked with a [RAII](https://stackoverflow.com/questions/2321511/what-is-meant-by-resource-acquisition-is-initialization-raii) helper like [`std::lock_guard`](https://en.cppreference.com/w/cpp/thread/lock_guard) – user4581301 Oct 05 '19 at 21:54
  • Looking over the code a bit more, I think you should take a look at [the example here](https://en.cppreference.com/w/cpp/thread/condition_variable) that shows a better way to use condition variables. – user4581301 Oct 05 '19 at 22:00
  • You are not unlocking the mutex after locking it when you're waiting for empty/full queue. You need to release the mutex before you go waiting. If you debug the code, you will find that one thread is waiting on CV while locking the mutex, all while the other thread is waiting to get the mutex. Unlock mutex before you call CV.wait. This may have some race conditions so be careful. – Everyone Oct 05 '19 at 22:42
  • @Everyone Actually the mutex should always be locked before calling CV wait. In this case it looks like there is a custom class wrapper around CV that associates a mutex with the CV. (This is fine as long as you always protect the CV loop-waiting condition with that lock, which is the case here) – Humphrey Winnebago Oct 06 '19 at 03:42
  • Please provide the rest of the code. Note that if you pause the debugger while it's deadlocked, it will show you where the deadlock is. – Humphrey Winnebago Oct 06 '19 at 03:52
  • @HumphreyWinnebago by definition `std::mutex::lock` will block if other thread hasn't unlocked the mutex. So waiting for CV while holding the lock is asking for deadlock. You should release the lock to allow other thread to acquire it and signal the CV. This will create race condition if not done properly, but that can be easily dealt with later. – Everyone Oct 06 '19 at 04:38
  • _Calling this function if lock.mutex() is not locked by the current thread is undefined behavior._ (https://en.cppreference.com/w/cpp/thread/condition_variable/wait). CV wait is designed with the purpose of atomically releasing the mutex and blocking the waiting thread. It does the unlocking for you, thus it must be locked. The race condition created by unlocking before CV wait can cause a deadlock and cannot be dealt with later. See my explanation here https://stackoverflow.com/questions/50331130/please-explain-the-use-of-condition-variables-in-c-threads-and-why-do-we-need/50347715#50347715 – Humphrey Winnebago Oct 06 '19 at 05:39

2 Answers2

-1

To solve the producer-consumer problem using condition variable, first you need to understand bounded buffer problem.

Check here the implementation of thread safe buffer queue using condition variable in C++: https://codeistry.wordpress.com/2018/03/08/buffer-queue-handling-in-multithreaded-environment/

You can use this buffer queue as the building block to solve the multiple produce consumer problem. Please check here how the thread safe buffer queue is used to solve producer-consumer problem in C++: https://codeistry.wordpress.com/2018/03/09/unordered-producer-consumer/

Ashish Khurange
  • 903
  • 7
  • 17
-7

You have discovered another example of how C++ makes a difficult problem out of a conceptually simple problem.

It appears that you want one or more producers to produce an identical number of values and have a set of consumers read and process those values. It also appears that you want the number of producers to equal the number of consumers, while allowing that number (producers and consumers) to be configurable.

This problem is very simple using Ada, which was designed with concurrency in mind.

The first file is the Ada package specification defining our producer and consumer task types.

generic
   Items_To_Handle : Positive;
package Integer_Prod_Con is

   task type Producer;

   task type Consumer;

end Integer_Prod_Con;

The generic parameter is much like a template parameter. In this case the value passed as a generic parameter must be a positive integer. The implementation of the package follows.

with Ada.Containers.Synchronized_Queue_Interfaces;
with Ada.Containers.Unbounded_Synchronized_Queues;
with Ada.Text_Io; use Ada.Text_IO;

package body Integer_Prod_Con is
   package Int_Interface is new Ada.Containers.Synchronized_Queue_Interfaces(Integer);
   package Int_Queue is new Ada.Containers.Unbounded_Synchronized_Queues(Queue_Interfaces =>Int_Interface);
   use Int_Queue;

   The_Queue : Queue;

   --------------
   -- Producer --
   --------------

   task body Producer is
   begin
      for Num in 1..Items_To_Handle loop
         The_Queue.Enqueue(Num);
         delay 0.010;
      end loop;
   end Producer;

   --------------
   -- Consumer --
   --------------

   task body Consumer is
      Value : Integer;
   begin
      for Num in 1..Items_To_Handle loop
         The_Queue.Dequeue(Value);
         Put_Line(Value'Image);
         delay 0.010;
      end loop;
   end Consumer;

end Integer_Prod_Con;

The package employs a pre-defined generic package implementing an unbounded queue as the buffer. This allows the queue to grow and shrink as needed by the program. Each producer task enqueues the integer values from 1 through Items_To_Handle and each consumer dequeues and outputs the same number of elements from the queue.

The main procedure for this program is:

with Integer_Prod_Con;

procedure Int_Queue_Main is
   PC_Count : constant := 3;

   package short_list is new Integer_Prod_Con(10);
   use short_List;

   Producers : Array(1..PC_Count) of Producer;
   Consumers : Array(1..PC_Count) of Consumer;
begin
   null;
end Int_Queue_Main;

The output of this program is:

 1
 1
 1
 2
 2
 2
 3
 3
 3
 4
 4
 4
 5
 5
 5
 6
 6
 6
 7
 7
 7
 8
 8
 8
 9
 9
 9
 10
 10
 10
Jim Rogers
  • 4,822
  • 1
  • 11
  • 24
  • The question is tagged in C++, I do believe the OP wants a C++ answer. Also, this isn't "another example of how C++ makes a difficult problem." Concurrency needs understanding, if you do understand it then C++ implementation is straightforward. – Everyone Oct 06 '19 at 01:37
  • @Everyone - The question is also tagged in Producer-Consumer and was answered as an example of a proper implementation of a producer-consumer problem. Yes, it concurrency does need understanding and C++ threading libraries do not facilitate such understanding. Placing lock/unlock pairs in the producer and consumer is faulty. They belong in the class methods for the shared buffer since they are behaviors of the buffer. Placing the locking calls outside buffer methods is a clear violation of encapsulation and cohesion of the buffer. – Jim Rogers Oct 06 '19 at 02:44
  • 4
    It is not acceptable to answer, "Just do it in some other programming language." The question is titled and tagged C++. It is usually not possible for the user pick the language. Every language has different multithreading semantics, so an example from another language isn't helpful. – Humphrey Winnebago Oct 06 '19 at 04:27
  • 1
    Please give a justification for the claim that "C++ makes a difficult problem out of a conceptually simple problem." Are you saying ProdCons cannot be done elegantly in C++? While true that C++ comes with fewer built-in abstractions this is because it is not preferred to guess how the user wants to customize the performance. – Humphrey Winnebago Oct 06 '19 at 04:28
  • The need to think how to customize performance is in itself a symptom of premature optimization. That does not mean that it is wrong to provide the ability to customize performance. It does mean that there should be a default level of performance that takes no premature optimization, just as there is for other language constructs such as loops. It is common to use loops rather than manually unrolling loops for performance. Rather than providing an option for low level control of concurrency C++ forces all concurrency to a low level of control, which is seldom elegant. – Jim Rogers Oct 06 '19 at 05:31
  • Standard multithreading is relatively new in C++. (Props to Ada for having it standard 16 years before C++.) C++ may well have a standard Synchronized Queue eventually. Is it possible you're conflating language with library? The OP asked why their C++ Synchronized Queue is deadlocking and your answer is not to use C++ because someone already wrote a Synchronized Queue in Ada. It is a beautiful piece of code for sure, but I don't think it's fair to say we should avoid C++ for this. I'm sure Ada does the same things as C++ if you look at the implementation of Synchronized Queue. – Humphrey Winnebago Oct 06 '19 at 06:48
  • Ada does what C++ does and a bit more. The Ada synchronized queue implicitly provides all the locking and signalling semantics used in C++ with an added capability. Each method such as the dequeue method is implemented as an "entry" in Ada. Each entry has a condition, similar to a C++ condition variable, and its own entry queue. The entry queue queues tasks waiting for the condition to become true. The queue is either a FIFO queue or a priority queue ensuring deterministic behavior of tasks waiting to process the entry. – Jim Rogers Oct 06 '19 at 13:48
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/200479/discussion-between-jim-rogers-and-humphrey-winnebago). – Jim Rogers Oct 06 '19 at 18:49
  • @JimRogers Ada is not the only language that supports multithreading and multithreaded containers via language standard library. Without any benchmark proof your answer is as useful as any "hellothread" sample in Java / C# / Python and there are doubts if it could beat C++ performance that easy – MasterAler Oct 16 '20 at 21:45
  • @MasterAler Ada does not support multithreading via a language standard library. Those capabilities are part of the core language. One of the ways Ada can beat C++ is in handling an entry whose boundary condition becomes true. If one task performs an action causing an entry boundary condition to become true that task executes the call of the next task waiting in the queue and the result, if any, is passed to the waiting task, avoiding extra lock handling and context switching. This cannot be done through C++ libraries. – Jim Rogers Oct 17 '20 at 03:06