I have created a generic message queue for use in a multi-threaded application. Specifically, single producer, multi-consumer. Main code below.
1) I wanted to know if I should pass a shared_ptr allocated with new into the enqueue method by value, or is it better to have the queue wrapper allocate the memory itself and just pass in a genericMsg object by const reference?
2) Should I have my dequeue method return a shared_ptr, have a shared_ptr passed in as a parameter by reference (current strategy), or just have it directly return a genericMsg object?
3) Will I need signal/wait in enqueue/dequeue or will the read/write locks suffice?
4) Do I even need to use shared_ptrs? Or will this depend solely on the implementation I use? I like that the shared_ptrs will free memory once all references are no longer using the object. I can easily port this to regular pointers if that's recommended, though.
5) I'm storing a pair here because I'd like to discriminate what type of message I'm dealing with else w/o having to do an any_cast. Every message type has a unique ID that refers to a specific struct. Is there a better way of doing this?
Generic Message Type:
template<typename Message_T>
class genericMsg
{
public:
genericMsg()
{
id = 0;
size = 0;
}
genericMsg (unsigned int &_id, unsigned int &_size, Message_T &_data)
{
id = _id;
size = _size;
data = _data;
}
~genericMsg()
{}
unisgned int id;
unsigned int size;
Message_T data; //All structs stored here contain only POD types
};
Enqueue Methods:
// ----------------------------------------------------------------
// -- Thread safe function that adds a new genericMsg object to the
// -- back of the Queue.
// -----------------------------------------------------------------
template<class Message_T>
inline void enqueue(boost::shared_ptr< genericMsg<Message_T> > data)
{
WriteLock w_lock(myLock);
this->qData.push_back(std::make_pair(data->id, data));
}
VS:
// ----------------------------------------------------------------
// -- Thread safe function that adds a new genericMsg object to the
// -- back of the Queue.
// -----------------------------------------------------------------
template<class Message_T>
inline void enqueue(const genericMsg<Message_T> &data_in)
{
WriteLock w_lock(myLock);
boost::shared_ptr< genericMsg<Message_T> > data =
new genericMsg<Message_T>(data_in.id, data_in.size, data_in.data);
this->qData.push_back(std::make_pair(data_in.id, data));
}
Dequeue Method:
// ----------------------------------------------------------------
// -- Thread safe function that grabs a genericMsg object from the
// -- front of the Queue.
// -----------------------------------------------------------------
template<class Message_T>
void dequeue(boost::shared_ptr< genericMsg<Message_T> > &msg)
{
ReadLock r_lock(myLock);
msg = boost::any_cast< boost::shared_ptr< genericMsg<Message_T> > >(qData.front().second);
qData.pop_front();
}
Get message ID:
inline unsigned int getMessageID()
{
ReadLock r_lock(myLock);
unsigned int tempID = qData.front().first;
return tempID;
}
Data Types:
std::deque < std::pair< unsigned int, boost::any> > qData;
Edit:
I have improved upon my design. I now have a genericMessage base class that I directly subclass from in order to derive the unique messages.
Generic Message Base Class:
class genericMessage
{
public:
virtual ~genericMessage() {}
unsigned int getID() {return id;}
unsigned int getSize() {return size;}
protected:
unsigned int id;
unsigned int size;
};
Producer Snippet:
boost::shared_ptr<genericMessage> tmp (new derived_msg1(MSG1_ID));
theQueue.enqueue(tmp);
Consumer Snippet:
boost::shared_ptr<genericMessage> tmp = theQueue.dequeue();
if(tmp->getID() == MSG1_ID)
{
boost::shared_ptr<derived_msg1> tObj = boost::dynamic_pointer_cast<derived_msg1>(tmp);
tObj->printData();
}
New Queue:
std::deque< boost::shared_ptr<genericMessage> > qData;
New Enqueue:
void mq_class::enqueue(const boost::shared_ptr<genericMessage> &data_in)
{
boost::unique_lock<boost::mutex> lock(mut);
this->qData.push_back(data_in);
cond.notify_one();
}
New Dequeue:
boost::shared_ptr<genericMessage> mq_class::dequeue()
{
boost::shared_ptr<genericMessage> ptr;
{
boost::unique_lock<boost::mutex> lock(mut);
while(qData.empty())
{
cond.wait(lock);
}
ptr = qData.front();
qData.pop_front();
}
return ptr;
}
Now, my question is am I doing dequeue correctly? Is there another way of doing it? Should I pass in a shared_ptr as a reference in this case to achieve what I want?