5

I asked a previous question about producer/consumer code that was overly general (though the answers were certainly helpful). So I've taken the suggestions from an earlier SO question by another author and converted them to C++ and boost. However I'm always a bit concerned about multithreaded code - so if anyone can see any obvious improvements I'd love to hear about them.

#include <pthread.h>
#include <deque>
#include <iostream>

#include "boost/thread.hpp"


class MyQueue
{
protected:
  boost::mutex mutex_;
  boost::condition_variable condition_;
  bool cancel_;
  std::deque<int> data_;

public:
  MyQueue() : mutex_(), condition_(), cancel_(false), data_()
  {
  }

  struct Canceled{};

  void push( int i )
  {
     boost::lock_guard<boost::mutex> l(mutex_);
     if(cancel_) throw Canceled();
     data_.push_back(i);
     condition_.notify_all();
  }

  void pop( int & i )
  {
     boost::unique_lock<boost::mutex> l(mutex_);
     while(! cancel_ && data_.size()==0 )
     {
        condition_.wait( l );
     }
     if(cancel_) throw Canceled();

     assert( data_.size() != 0 );
     i = data_.front();
     data_.pop_front();
  }

  void cancel()
  {
     boost::lock_guard<boost::mutex> l(mutex_);
     if( cancel_) throw Canceled();
     cancel_ = true;
     condition_.notify_all();
  }
};


boost::mutex iomutex;

void producer( MyQueue * q, const std::string & name )
try
{
  for(unsigned int i=0 ; i<20; ++i)
  {
    q->push( i );
    boost::lock_guard<boost::mutex> l(iomutex);
    std::cout<<name<<"  PRODUCED "<<i<<std::endl;
  }

  sleep(1);
  q->cancel();
  {
    boost::lock_guard<boost::mutex> l(iomutex);
    std::cout<<name<<"  PRODUCER EXITING NORMALLY"<<std::endl;
  }
}
catch( MyQueue::Canceled & c )
{
  boost::lock_guard<boost::mutex> l(iomutex);
  std::cout<<name<<"  PRODUCER CANCLED "<<std::endl;
}

void consumer( MyQueue * q, const std::string & name )
try
{
  while(true)
  {
    int i;
    q->pop( i );
    boost::lock_guard<boost::mutex> l(iomutex);
    std::cout<<name<<"  CONSUMED "<<i<<std::endl;
  }
}
catch( MyQueue::Canceled & c )
{
  boost::lock_guard<boost::mutex> l(iomutex);
  std::cout<<name<<"  CONSUMER CANCLED "<<std::endl;
}

int main()
{
  MyQueue q;
  boost::thread pr1( producer, &q, "pro1");
  boost::thread pr2( producer, &q, "pro2");
  boost::thread cons1( consumer, &q, "con1");
  boost::thread cons2( consumer, &q, "con2");

  pr1.join();
  pr2.join();
  cons1.join();
  cons2.join();
}

UPDATE: I ended up using a modified version of Anthony Williams' concurrent queue. My modified version can be found here.

Community
  • 1
  • 1
Michael Anderson
  • 70,661
  • 7
  • 134
  • 187

1 Answers1

5

If you're worried about potential pitfalls in your implementation, you can try using Anthony Williams' (maintainer of the Boost.Thread library) excellent thread-safe, multiple-producer, multiple-consumer queue.

tmatth
  • 499
  • 4
  • 14
  • +1, Ah, I had come across this before. It lacks the ability to cancel the consumers. But I actually posted a modification that (I think) allows cancelling. I was a bit nervous about using a MT library from someone that I knew nothing about ... however as a respected thread library maintainer it seems likely that he knows what he's doing. – Michael Anderson Jul 20 '10 at 00:59
  • Since Williams' queue doesn't actually manage the producer or consumer threads, cancellation of consumers can just be done by having them check a flag before trying to pop the queue and/or interrupting them if needed. See http://www.boost.org/doc/libs/1_42_0/doc/html/thread/thread_management.html#thread.thread_management.interruption I would tend to keep that aspect of thread management independent of the queue. – tmatth Jul 20 '10 at 18:59
  • if we handle the cancelation outside of the queue, then the cancelling thread needs to have a handle to every possible reading thread. That sounds like a worse level of coupling to me. – Michael Anderson Jul 21 '10 at 06:11
  • This isn't necessarily the case. Say your cancelling thread just sets some flag that the other threads check periodically, it doesn't haven't to know anything about the other threads. – tmatth Jul 21 '10 at 20:44