0

I have created a generic message queue for use in a multi-threaded application. Specifically, single producer, multi-consumer. Main code below.

1) I wanted to know if I should pass a shared_ptr allocated with new into the enqueue method by value, or is it better to have the queue wrapper allocate the memory itself and just pass in a genericMsg object by const reference?

2) Should I have my dequeue method return a shared_ptr, have a shared_ptr passed in as a parameter by reference (current strategy), or just have it directly return a genericMsg object?

3) Will I need signal/wait in enqueue/dequeue or will the read/write locks suffice?

4) Do I even need to use shared_ptrs? Or will this depend solely on the implementation I use? I like that the shared_ptrs will free memory once all references are no longer using the object. I can easily port this to regular pointers if that's recommended, though.

5) I'm storing a pair here because I'd like to discriminate what type of message I'm dealing with else w/o having to do an any_cast. Every message type has a unique ID that refers to a specific struct. Is there a better way of doing this?

Generic Message Type:

template<typename Message_T>
class genericMsg
{ 
  public:
    genericMsg()
    {
       id = 0;
       size = 0;
    }

    genericMsg (unsigned int &_id, unsigned int &_size, Message_T &_data)
    {
       id = _id;
       size = _size;
       data = _data;
    }

    ~genericMsg()
    {}

    unisgned int   id;
    unsigned int   size;
    Message_T      data; //All structs stored here contain only POD types
};

Enqueue Methods:

  // ----------------------------------------------------------------
  // -- Thread safe function that adds a new genericMsg object to the
  // -- back of the Queue.
  // -----------------------------------------------------------------
  template<class Message_T>
  inline void enqueue(boost::shared_ptr< genericMsg<Message_T> > data)
  {
     WriteLock w_lock(myLock);
     this->qData.push_back(std::make_pair(data->id, data));
  }

VS:

  // ----------------------------------------------------------------
  // -- Thread safe function that adds a new genericMsg object to the
  // -- back of the Queue.
  // -----------------------------------------------------------------
  template<class Message_T>
  inline void enqueue(const genericMsg<Message_T> &data_in)
  {
     WriteLock w_lock(myLock);
     boost::shared_ptr< genericMsg<Message_T> > data = 
          new genericMsg<Message_T>(data_in.id, data_in.size, data_in.data);
     this->qData.push_back(std::make_pair(data_in.id, data));
  }

Dequeue Method:

  // ----------------------------------------------------------------
  // -- Thread safe function that grabs a genericMsg object from the
  // -- front of the Queue.
  // -----------------------------------------------------------------
  template<class Message_T>
  void dequeue(boost::shared_ptr< genericMsg<Message_T> > &msg)
  {
     ReadLock r_lock(myLock);
     msg = boost::any_cast< boost::shared_ptr< genericMsg<Message_T> > >(qData.front().second);
     qData.pop_front();
  }

Get message ID:

  inline unsigned int getMessageID()
  {
     ReadLock r_lock(myLock);
     unsigned int tempID = qData.front().first;
     return tempID;
  }

Data Types:

 std::deque < std::pair< unsigned int, boost::any> > qData;

Edit:

I have improved upon my design. I now have a genericMessage base class that I directly subclass from in order to derive the unique messages.

Generic Message Base Class:

class genericMessage
{
   public:
      virtual ~genericMessage() {}
      unsigned int getID() {return id;}
      unsigned int getSize() {return size;}

   protected:
      unsigned int id;
      unsigned int size;   
};

Producer Snippet:

 boost::shared_ptr<genericMessage> tmp (new derived_msg1(MSG1_ID));
 theQueue.enqueue(tmp);

Consumer Snippet:

 boost::shared_ptr<genericMessage> tmp = theQueue.dequeue();         
 if(tmp->getID() == MSG1_ID)
 {
    boost::shared_ptr<derived_msg1> tObj = boost::dynamic_pointer_cast<derived_msg1>(tmp);
    tObj->printData();
 }

New Queue:

  std::deque< boost::shared_ptr<genericMessage> > qData;

New Enqueue:

void mq_class::enqueue(const boost::shared_ptr<genericMessage> &data_in)
{
   boost::unique_lock<boost::mutex> lock(mut);
   this->qData.push_back(data_in);
   cond.notify_one();
}

New Dequeue:

boost::shared_ptr<genericMessage> mq_class::dequeue()
{
   boost::shared_ptr<genericMessage> ptr;
   {
      boost::unique_lock<boost::mutex> lock(mut);
      while(qData.empty())
      {
         cond.wait(lock);
      }
      ptr = qData.front();
      qData.pop_front();
   }
   return ptr;
}

Now, my question is am I doing dequeue correctly? Is there another way of doing it? Should I pass in a shared_ptr as a reference in this case to achieve what I want?

demarr
  • 108
  • 9

3 Answers3

1

Edit (I added answers for parts 1, 2, and 4).

1) You should have a factory method that creates new genericMsgs and returns a std::unique_ptr. There is absolutely no good reason to pass genericMsg in by const reference and then have the queue wrap it in a smart pointer: Once you've passed by reference you have lost track of ownership, so if you do that the queue is going to have to construct (by copy) the entire genericMsg to wrap.

2) I can't think of any circumstance under which it would be safe to take a reference to a shared_ptr or unique_ptr or auto_ptr. shared_ptrs and unique_ptrs are for tracking ownership and once you've taken a reference to them (or the address of them) you have no idea how many references or pointers are still out there expecting the shared_ptr/unique_ptr object to contain a valid naked pointer.

unique_ptr is always preferred to a naked pointer, and is preferred to a shared_ptr in cases where you only have a single piece of code (validly) pointing to an object at a time.

3) Yes, you need to use a std::condition_variable in your dequeue function. You need to test whether qData is empty or not before calling qData.front() or qData.pop_front(). If qData is empty you need to wait on a condition variable. When enqueue inserts an item it should signal the condition variable to wake up anyone who may have been waiting.

Your use of reader/writer locks is completely incorrect. Don't use reader/writer locks. Use std::mutex. A reader lock can only be used on a method that is completely const. You are modifying qData in dequeue, so a reader lock will lead to data races there. (Reader writer locks are only applicable when you have stupid code that is both const and holds locks for extended period of time. You are only keeping the lock for the period of time it takes to insert or remove from the queue, so even if you were const the added overhead of reader/writer locks would be a net lose.)

An example of implementing a (bounded) buffer using mutexes and condition_variables can be found at: Is this a correct way to implement a bounded buffer in C++.

4) unique_ptr is always preferred to naked pointers, and usually preferred to shared_ptr. (The main exception where shared_ptr might be better is for graph-like data structures.) In cases like yours where you are reading something in on side, creating a new object with a factory, moving the ownership to the queue and then moving ownership out of the queue to the consumer it sounds like you should be using unique_ptr.

5) You are reinventing tagged unions. Virtual functions were added to c++ specifically so you wouldn't need to do this. You should subclass your messages from a class that has a virtual function called do_it() (or better yet, operator()() or something like that). Then instead of tagging each struct, make each struct a subclass of your message class. When you dequeue each struct (or ptr to struct) just call do_it() on it. Strong static typing, no casts. See C++ std condition variable covering a lot of share variables for an example.

Also: if you are going to stick with the tagged unions: you can't have separate calls to get the id and the data item. Consider: If thread A calls to get the id, then thread B calls to get the id, then thread B retrieves the data item, now what happens when thread A calls to retrieve a data item? It gets a data item, but not with the type that it expected. You need to retrieve the id and the data item under the same critical section.

Community
  • 1
  • 1
Wandering Logic
  • 3,323
  • 1
  • 20
  • 25
  • Okay, thanks for the details, I appreciate the response! My only issue with subclassing is that we have 100s of messages and I was hoping something more generic would be way more portable. _you can't have separate calls to get the id and the data item._ Hmm, hadn't thought of this, this will definitely be an issue as I'll need to know what type of message we have in order to cast it. I'll defintiely have to re think my strategy. – demarr Apr 30 '13 at 01:12
  • What is not portable about subclassing? Even the oldest c++ compilers support it. (More compilers than support templates.) Perhaps you meant something else? – Wandering Logic Apr 30 '13 at 12:02
  • Sorry, I meant portable as in generic enough to be usable in other, similar applications. We have many distributed applications that talk to each other using these types of middleware messages. – demarr Apr 30 '13 at 14:05
  • I see. You are correct that you can not pass pointers between processes in a distributed application. But c++ inter-thread synchronization doesn't work across processes, only inside a single process. You might look into [Boost's `serialization` interface](http://www.boost.org/doc/libs/1_53_0/libs/serialization/doc/index.html) for passing the data structures between processes. You use the serialization at the process boundaries (while reading/writing files/sockets/pipes) and then in the rest of your program you don't need to know about it. – Wandering Logic Apr 30 '13 at 15:20
  • Yeah, we already have a solution that handles message routing between apps, albeit it's legacy, but we can't change that. Our typical application takes legacy messages that it subscribes to, throws them onto a queue, and processes and translates them into other types of data that are passed along. I was trying to come up with a general solution that would kind of act as a front end to this process for all future applications, since they all have to take in legacy messages. The only difference between the applications are what messages they need to subscribe to and how they're processed. – demarr Apr 30 '13 at 16:19
  • Okay, so I've decided to go the route that you suggested (subclassing). I'd like it to return the exact message type for each derived class with something like getData(). I looked into covariant return types, is there a better way of doing this? Should I make a virtual method called clone in my base class that returns the derived class and then call the derived classes getData() method to use the actual data? Is this overkill? – demarr Apr 30 '13 at 20:30
  • This is getting way outside my range of expertise, so take this with a grain of salt. (I don't know what a covariant return type is, for example.) Since you need to have different `do_it()` routines for each object type in different applications then it sounds like what you really want is a visitor pattern: http://stackoverflow.com/questions/255214/when-should-i-use-the-visitor-design-pattern. – Wandering Logic Apr 30 '13 at 21:34
  • I've updated my current strategy based on your sub classing suggestion. My only issue now is whether I actually need smart pointers or if I should just stick w/ regular pointers. If they're okay to use in this context, did I implement my dequeue method properly? I'm not sure if creating a temporary shared_ptr w/ in dequeue is the correct way to go about doing this. I tried to make it so front and pop_front are in the same critical section, but returning the stored shared_ptr proved tricky. Is it better to pass in a reference to a shared_ptr in this case? – demarr May 01 '13 at 05:18
  • I updated my answer to discuss smart ptrs. Short answer: use unique_ptr everywhere and don't use regular ptrs (or references) at all. – Wandering Logic May 01 '13 at 13:35
1

First of all, it's better to use 3rd-party concurrency containers than to implement them yourself, except it's for education purpose.

Your messages doesn't look to have costly constructors/destructor, so you can store them by value and forget about all your other questions. Use move semantics (if available) for optimizations.

If your profiler says "by value" is bad idea in your particular case:

I suppose your producer creates messages, puts them into your queue and loses any interest in them. In this case you don't need shared_ptr because you don't have shared ownership. You can use unique_ptr or even a raw pointer. It's implementation details and better to hide them inside the queue.

From performance point of view, it's better to implement lock-free queue. "locks vs. signals" depends completely on your application. For example, if you use thread pool and kind of a scheduler it's better to allow your clients to do something useful while queue is full/empty. In simpler cases reader/writer lock is just fine.

Andriy Tylychko
  • 15,967
  • 6
  • 64
  • 112
  • I disagree about lock-free queue being better. (1) The consumers here are a thread pool. If the queue is empty that means _there is nothing for the threads to do_ so better have them block than spin wasting cpu. (2) The jury is out on the conditions under which lock-free queues are better. Usually they are just a premature optimization. See http://stackoverflow.com/questions/5680869/are-lock-free-algorithms-really-more-performant-than-their-lock-full-counterpart for an interesting discussion. – Wandering Logic Apr 29 '13 at 20:48
0

If I want to be thread safe, I usually use const objects and modify only on copy or create constructor. In this way you don't need to use any lock mechanism. In a threaded system, it is usually more effective than use mutexes on a single instance.

In your case only deque would need lock.

Naszta
  • 7,560
  • 2
  • 33
  • 49