3

I've an application where producers and consumers ("clients") want to send broadcast messages to each other, i.e. a n:m relationship. All could be different programs so they are different processes and not threads.

To reduce the n:m to something more maintainable I was thinking of a setup like introducing a little, central server. That server would offer an socket where each client connects to.
And each client would send a new message through that socket to the server - resulting in 1:n.

The server would also offer a shared memory that is read only for the clients. It would be organized as a ring buffer where the new messages would be added by the server and overwrite older ones.
This would give the clients some time to process the message - but if it's too slow it's bad luck, it wouldn't be relevant anymore anyway...

The advantage I see by this approach is that I avoid synchronisation as well as unnecessary data copying and buffer hierarchies, the central one should be enough, shouldn't it?

That's the architecture so far - I hope it makes sense...

Now to the more interesting aspect of implementing that:
The index of the newest element in the ring buffer is a variable in shared memory and the clients would just have to wait till it changes. Instead of a stupid while( central_index == my_last_processed_index ) { /* do nothing */ } I want to free CPU resources, e.g. by using a pthread_cond_wait().

But that needs a mutex that I think I don't need - on the other hand Why do pthreads’ condition variable functions require a mutex? gave me the impression that I'd better ask if my architecture makes sense and could be implemented like that...

Can you give me a hint if all of that makes sense and could work?

(Side note: the client programs could also be written in the common scripting languages like Perl and Python. So the communication with the server has to be recreated there and thus shouldn't be too complicated or even proprietary)

Community
  • 1
  • 1
Chris
  • 3,265
  • 5
  • 37
  • 50
  • To be clear here: the messages are of broadcast type, i.e. a "consumer" doesn't "consume" the message, it just reads it - all "consumers" have to read all messages! – Chris Aug 04 '12 at 11:36

3 Answers3

3

If memory serves, the reason for the mutex accompanying a condition variable is that under POSIX, signalling the condition variable causes the kernel to wake up all waiters on the condition variable. In these circumstances, the first thing that consumer threads need to do is check is that there is something to consume - by means of accessing a variable shared between producer and consumer threads. The mutex protects against concurrent access to the variable used for this purpose. This of course means that if there are many consumers, n-1 of them are needless awoken.

Having implemented precisely the arrangement described above, the choice of IPC object to use is not obvious. We were buffering audio between high priority real-time threads in separate processes, and didn't want to block the consumer. As the audio was produced and consumed in real-time, we were already getting scheduled regularly on both ends, and if there wasn't to consume (or space to produce into) we trashed the data because we'd already missed the deadline.

In the arrangement you describe, you will need a mutex to prevent the consumers concurrently consuming items that are queued (and believe me, on a lightly loaded SMP system, they will). However, you don't need to have the producer contend on this as well.

I don't understand you comment about the consumer having read-only access to the shared memory. In the classic lockless ring buffer implementation, the producer writes the queue tail pointer and the consumer(s) the head - whilst all parties need to be able to read both. You might of course arrange for the queue head and tails to be in a different shared memory region to the queue data itself.

Also be aware that there is a theoretical data coherency hazard on SMP systems when implementing a ring buffer such as this - namely that write-back to memory of the queue content with respect to the head or tail pointer may occur out of order (they in cache - usually per-CPU core). There are other variants on this theme to do with synchonization of caches between CPUs. To guard against these, you need to an memory, load and store barriers to enforce ordering. See Memory Barrier on Wikipedia. You explicitly avoid this hazard by using kernel synchronisation primitives such as mutex and condition variables.

The C11 atomic operations can help with this.

marko
  • 9,029
  • 4
  • 30
  • 46
  • Thanks! But I'd want that all consumers are woken up as the messages are of broadcast type. I.e. all the clients have to process all of the messages. So all that I need to protect is that all bytes of the index (e.g. 4 for a typical `int32`) are written at once and read at once. So I'd use an atomic operation there. – Chris Aug 04 '12 at 11:40
  • So if that's the case, who updates the head pointer on the queue (or put another way: how does the producer know the capacity remaining in the queue?). You get no guarantees when the consumer threads are going to run with respect to each other or the producer. – marko Aug 04 '12 at 11:55
  • The only way I can see this working is the tracking the number of consumers that have consumed each message and 'releasing' the entry back to the ring buffer once all have consumed. The obvious objection to this approach is that the producer needs to know the number of consumers at the point of producing the message, and that adding or removing consumers dynamically will get interesting. I think the robust (lockless) solution to this problem looks like a queue for each consumer, with broadcasts inserted into each. – marko Aug 04 '12 at 11:56
  • Well, the producer doesn't care if the message was read. The size of the buffer is max_message_rate*max_interesting_time - that gives the guarantee that all messages in the interesting timeframe are available. And if a client consumes too slowly it won't be able to read the oldest messages... As memory is limited I need to handle the case when a buffer runs full. Doing it this way I hope to be obvious. – Chris Aug 04 '12 at 13:08
2

You probably can have a bit of different design by using sem_t if your system has them; some POSIX systems are still stuck on the 2001 version of POSIX.

You probably don't forcably need a mutex/condition pair. This is just how it was designed long time ago for POSIX.

Modern C, C11, and C++, C++11, now brings you (or will bring you) atomic operations, which were a feature that is implemented in all modern processors, but lacked support from most higher languages. Atomic operations are part of the answer for resolving a race condition for a ring buffer as you want to implement it. But they are not sufficient because with them you can only do active wait through polling, which is probably not what you want.

Linux, as an extension to POSIX, has futex that resolves both problems: to avoid races for updates by using atomic operations and the ability to putting waiters to sleep via a system call. Futexes are often considered as being too low level for everyday programming, but I think that it actually isn't too difficult to use them. I have written up things here.

Jens Gustedt
  • 76,821
  • 6
  • 102
  • 177
  • `futex` is a little low-level for most folk. You can't use `futex` without atomic operations, for example. – Dietrich Epp Aug 03 '12 at 22:52
  • @DietrichEpp, this is a common opinion, but I don't think that it is completely justified. In particular using atomic operations shouldn't be a real problem these days (we are talking about linux with gcc or similar). Please see the link in my edit. – Jens Gustedt Aug 04 '12 at 06:14
2

You do need a mutex on a pthread_cond_wait() as far as I know. The reason is that pthread_cond_wait() is not atomic. The condition variable could change during the call, unless it's protected by a mutex.

It's possible that you can ignore this situation - the client might sleep past message 1, but when the subsequent message is sent then the client will wake up and find two messages to process. If that's unacceptable then use a mutex.

dave
  • 4,812
  • 4
  • 25
  • 38