0

I am writing a multithreaded program in C++ and in my main thread I am waiting for my other threads to put packages in different queues. Depending on the sort of package and from which thread they originate.

The queues are protected by mutexes as it should be.

But in my main I don't want to be doing:

while(true)
if(!queue1->empty)
{
     do stuff
}
if(!queue2->empty)
{
     do stuff
}
etc

So you need to use condition variables to signal the main that something has changed. Now I can only block on 1 condition variable so I need all these threads to use the same conditional variable and an accompanying mutex. Now I dont want to really use this mutex to lock all my threads. It doesn't mean that when 1 thread is writing to a queue, another cant write to a totally different queue. So I use seperate mutexes for each queue. But now how do I use this additional mutex that comes with the conditional variable.

How it's done with 2 threads and 1 queue using boost, very simular to std. http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html :

template<typename Data>
class concurrent_queue
{
    private:
    boost::condition_variable the_condition_variable;  
    public:
    void wait_for_data()
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
             the_condition_variable.wait(lock);
        }
    }
    void push(Data const& data)
    {
         boost::mutex::scoped_lock lock(the_mutex);
         bool const was_empty=the_queue.empty();
         the_queue.push(data);
         if(was_empty)
         {
             the_condition_variable.notify_one();
         }
     }
     // rest as before
 };

So how do you solve this?

Silver
  • 1,075
  • 3
  • 12
  • 37

3 Answers3

3

I'd say the key to your problem is here:

Now I dont want to really use this mutex to lock all my threads. It doesn't mean that when 1 thread is writing to a queue, another cant write to a totally different queue. So I use seperate mutexes for each queue.

Why? Because:

... packages come in relatively slow. And queues are empty most of the time

It seems to me that you've designed yourself into a corner because of something you thought you needed when in reality you probably didn't need it because one queue would have actually worked in the usage scenario you mention.

I'd say start off with one queue and see how far it gets you. Then, when you run into a limitation where you really do have many threads waiting on a single mutex, you will have a lot more information about the problem and will therefore be able to solve it better.

In essence, I'd say the reason you're facing this problem is premature design optimization and the way to fix that would be to back track and change the design right now.

Carl
  • 43,122
  • 10
  • 80
  • 104
  • The problem with one queue is that packages are objects of different classes. Depending on the type they originate in different thread and also require a different handling in the main thread. Also main makes packages and sends them to threads. I am communicating with zigbee trough a serial port. I have a webservice running that accepts HTTP packages and Im a client to a webservice that runs some sort of exotic database where I put data. So 1 queue wont work cause packets or objects from different classes. And I don't want to be casting them the entire time. (or should I?) – Silver Mar 22 '13 at 13:41
  • 1
    You could encode the type as a numeric field, which would do away with the problem of casting. – Carl Mar 22 '13 at 21:34
  • Idd I could do that but I didn't feel entirely comfortable doing so. General rule of thumb is not too cast if not necessary. But I guess in this case it might be necessary. – Silver Mar 24 '13 at 13:41
  • You don't _have_ to cast in order to put different types in the same queue - in some cases you can just use runtime polymorphism. Typical examples are `Task` subclasses that know how to "run" themselves, or the visitor pattern. – Useless Mar 24 '13 at 18:24
  • The problem is that the child classes have different functions that do not exist in the parent. Making all the functions pure virtual in the parent makes no sense since some children will not at all support them. + you will get a huge list of virtual function in the parent. So just polymorphism will not suffice. I am going to write a class that contains a queue of packages and a mutex and a typeidentifier for the package so that I can cast them back to the orignal. All casting will be done inside the class and I will just make different setters and getters per type of packet. Thx! – Silver Mar 25 '13 at 14:02
  • The other thing you could look at is http://www.boost.org/doc/libs/1_53_0/doc/html/any.html – Carl Mar 25 '13 at 19:19
2

Create a top-level (possibly circular) queue of all the queues that have work in them.

This queue can be protected by a single mutex, and have a condvar which only needs to be signalled when it changes from empty to non-empty.

Now, your individual queues can each have their own mutex, and they only need to touch the shared/top-level queue (and its mutex) when they change from empty to non-empty.

Some details will depend on whether you want your thread to take only the front item from each non-empty queue in turn, or consume each whole queue in sequence, but the idea is there.


Going from non-empty to non-empty (but increased size) should also be passed on to the top level queue?

That depends, as I said, on how you consume them. If, every time a queue has something in it, you do this:

  1. (you already have the top-level lock, that's how you know this queue has something in it)
  2. lock the queue
  3. swap the queue contents with a local working copy
  4. remove the queue from the top-level queue
  5. unlock the queue

then a work queue is always either non-empty, and hence in the top-level queue, or empty and hence not in the queue.

If you don't do this, and just pull the front element off each non-empty queue, then you have more states to consider.


Note that if

... packages come in relatively slow. And queues are empty most of the time

you could probably just have one queue, since there wouldn't be enough activity to cause a lot of contention. This simplifies things enormously.

Useless
  • 64,155
  • 6
  • 88
  • 132
  • well packages come in relatively slow. And queues are empty most of the time. So then every time a packages comes in I still need to access the top level queue. Also I don't see why the top level queue only needs to be locked when individual queues go from empty to non-empty. Going from non-empty to non-empty (but increased size) should also be passed on to the top level queue? I think I will probably copy the items in the shared queues to a local queue in my main. Because the processing can take some place. So emptying the queues will go relatively fast. Just queue.front() and queue.pop(). – Silver Mar 21 '13 at 22:32
  • locking all queues wouldn't make my program that bad but it's part of a thesis and I want to optimize it as much as possible. Could I just use a mutex that locks and is used for my condition variable but that doesn't lock any queues. Or do you think this will cause problems somewhere? – Silver Mar 21 '13 at 22:37
0

Both @Carleeto and @Useless have given good answers. You have only a single consumer, so a single queue will give you the best performance. You can't get higher throughput than the single consumer working constantly, so your objective should be to minimize the locking overhead of the single consumer, not the producers. You do this by having the producer wait on a single condition variable (indicating that the queue is non-empty) with a single mutex.

Here's how you do the parametric polymorphism. Complete type safety, no casting, and only a single virtual function in the parent class:

class ParentType {
public:
  virtual void do_work(...[params]...)=0;
  virtual ~ParentType() {}
};

class ChildType1 : public ParentType {
private:
  // all my private variables and functions
public:
  virtual void do_work(...[params]...) {
    // call private functions and use private variables from ChildType1
  }
};

class ChildType2: public ParentType {
private:
  // completely different private variables and functions
public:
  virtual void do-work(...[params]...) {
    // call private functions and use private variables from ChildType2
  }
};
Wandering Logic
  • 3,323
  • 1
  • 20
  • 25