6

I am tasked to modify a synchronous C program so that it can run in parallel. The goal is to have it be as portable as possible as it is an open source program that many people use. Because of this, I thought it would be best to wrap the program in a C++ layer so that I could take advantage of the portable boost libraries. I have already done this and everything seems to work as expected.

The problem I am having is deciding on what is the best approach to pass messages between the threads. Luckily, the architecture of the program is that of a multiple producer and single consumer. Even better, the order of the messages is not important. I have read that single-producer/single-consumer (SPSC) queues would benefit from this architecture. Those experienced with multi-threaded programming have any advice? I'm quite new to this stuff. Also any code examples using boost to implement SPSC would be greatly appreciated.

walrii
  • 3,472
  • 2
  • 28
  • 47
grouma
  • 626
  • 2
  • 8
  • 18
  • See accepted answer to http://stackoverflow.com/questions/8918401/does-a-multiple-producer-single-consumer-lock-free-queue-exist-for-c – walrii Aug 04 '12 at 01:47

3 Answers3

8

Below is the technique I used for my Cooperative Multi-tasking / Multi-threading library (MACE) http://bytemaster.github.com/mace/. It has the benefit of being lock-free except for when the queue is empty.

struct task {
   boost::function<void()> func;
   task* next;
};


boost::mutex                     task_ready_mutex;
boost::condition_variable        task_ready;
boost::atomic<task*>             task_in_queue;

// this can be called from any thread
void thread::post_task( task* t ) {
     // atomically post the task to the queue.
     task* stale_head = task_in_queue.load(boost::memory_order_relaxed);
     do { t->next = stale_head;
     } while( !task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );

   // Because only one thread can post the 'first task', only that thread will attempt
   // to aquire the lock and therefore there should be no contention on this lock except
   // when *this thread is about to block on a wait condition.  
    if( !stale_head ) { 
        boost::unique_lock<boost::mutex> lock(task_ready_mutex);
        task_ready.notify_one();
    }
}

// this is the consumer thread.
void process_tasks() {
  while( !done ) {
   // this will atomically pop everything that has been posted so far.
   pending = task_in_queue.exchange(0,boost::memory_order_consume);
   // pending is a linked list in 'reverse post order', so process them
   // from tail to head if you want to maintain order.

   if( !pending ) { // lock scope
      boost::unique_lock<boost::mutex> lock(task_ready_mutex);                
      // check one last time while holding the lock before blocking.
      if( !task_in_queue ) task_ready.wait( lock );
   }
 }
bytemaster
  • 572
  • 2
  • 5
  • ..though +1 for using a link inside each message to avoid message storage within the queue. – Martin James Aug 04 '12 at 07:52
  • An interesting and simple implementation. Two notes: (a) semantics of mutex with nothing but notify_one() in it looks suspicious (nothing is required to change if we move notify_one() outside of the mutex, and then mutex becomes empty, i.e. it is not really required to do anything meaningful). (b) the logic about "the first thread" seems only to stand if we're not considering this first thread to lock for long enough until task_in_queue goes to non-zero and then back to zero. Most likely, both problems can be solved by adding a mutex-protected counter of "number_of_pending_notifications". – No-Bugs Hare Jul 20 '17 at 15:11
2

There are many examples of producer-consumer queues on the net, safe for multiple producers/consumers. @bytemaster posted one that uses a link inside each message to eliminate storage in the queue class itself - that's a fine approach, I use it myself on embedded jobs.

Where the queue class must provide storage, I usually go with a 'pool queue' of size N, loaded up with N *message class instances at startup. Threads that need to communicate have to pop a *message from the pool, load it up and queue it on. When eventually 'used up' the *message gets pushed back onto the pool. This caps the number of messages and so all queues need only be of length N - no resizing, no new(), no delete(), easy leak-detection.

Martin James
  • 24,453
  • 3
  • 36
  • 60
  • I simplified the code for the answer, but in reality every 'task' is storing a functor of 'unknown size' (to avoid boost::function<>'s heap alloc). Furthermore, I let the task double as a reference counted promise that is kept alive as long as a future holds it. I end up with 1 malloc per task and can push 2.4 million sync post/wait ops per second. – bytemaster Aug 04 '12 at 14:46
1

If there is only a single consumer but multiple producers, then I would use an array or some array-like data-structure with O(1) access time where each array-slot represents a single-producer-consumer queue. The great advantage to a single-producer-consumer queue is the fact you can make it lock-free without any explicit synchronization mechanisms, thus making it a very fast data-structure in a multi-threaded environment. See my answer here for a bare-bones implementation of a single-producer-consumer queue.

Community
  • 1
  • 1
Jason
  • 31,834
  • 7
  • 59
  • 78
  • I used this technique before adopting the atomic solution below. The problem I had was that it didn't scale, it consumed memory while 'idle ' for the buffers, and if you don't know what threads might be posting in advance(common), then you must either dynamically resize (via a lock) or hardcode a 'large' maximum thread count. – bytemaster Aug 04 '12 at 02:34
  • I was thinking of using a circular queue ... that way there is no need to re-size the queue. – Jason Aug 04 '12 at 04:17
  • that is the 'fixed size queue' that can fill up, but if you have N threads then each thread either needs one input for all other N threads, or the first time a new thread attempts to communicate with another it must allocate its own single producer/ single consumer queue. This is the resize vs hard code issue. – bytemaster Aug 04 '12 at 04:21