0

I have a single UDP thread which reads multiple datagrams through recvmmsg system call from different multiplexed streams and pushes them in different circular/ring buffers. These ring buffers are part of Stream structures. Each stream is sending a speech frame every 20ms. So the UDP packets might look like this: F1S1 F1S2 F1S3 F2S1 and so on OR in case of a burst it might look like this: F1S1 F2S1 F3S1 F1S2 and so on. After the reception, these packets will be processed in parallel by a library which works on the principle of ITP. The UDP thread has to dispatch these parllel tasks alongwith the list of the packets to process. Limitation here is that Tasks must not process two frames in parallel from the SAME stream AND Tasks MUST have their own independent memory for frame handling. So I need to make sure an order of FIFO execution for these frames and this will be done within the UDP thread before I spawn these tasks. Currently, when I receive these packets, I lookup the streamId and place the frame in a circular buffer which is part of Stream Strctures with a for_loop.

Here is the code which shows whats going on in the UDP thread.

while (!(*thread_stop))
    {
        int nr_datagrams = recvmmsg(socket_handle->fd_udp, datagramS, VLEN, 0,
                NULL);
        .....
        for (int i = 0; i < nr_datagrams; i++)
        {
        ....
            hash_table_search(codec_buffers, _stream_id, (void **) &codecPtr))
        .....
            // Circular Buffer for speech frames
            // store the incoming sequence number for the newest packet
            codecPtr->circBuff.seqNum[codecPtr->circBuff.newestIdx] = _seq_num;

            // Update the entry pointer to point to the newest frame
            codecPtr->circBuff.entries = codecPtr->circBuff.entries
                    + codecPtr->circBuff.newestIdx * codecPtr->frameLength;
            // Copy the contents of the frame in entry buffer
            memcpy(codecPtr->circBuff.entries,
                    2 * sizeof(uint16_t) + datagramBuff[i],
                    codecPtr->frameLength);
            // Update the newest Index
            codecPtr->circBuff.newestIdx =
                    (codecPtr->circBuff.newestIdx + 1) &
                    codecPtr->circBuffSize;
          }

My program should now pop the frames from the ring buffers of different streams which has recently received the data but NOT the latest data as all recently received packets might belong to the same stream in case of a burst. Now how should I go forward is the dilemma I am facing?

Black_Zero
  • 445
  • 1
  • 6
  • 12
  • is it ring buffer implementation you are asking about? – LearningC Mar 14 '16 at 17:37
  • I think the most important question here is how your processing logic is structured. You can certainly use a circular buffer in a single thread but will it be useful? Will you even need it? – 5gon12eder Mar 14 '16 at 17:39
  • Yes it is ring buffer. I do need it because the datagrams destined for a specific ring buffer are interleaved with other datagrams from other buffers and the processing of these datagrams needs strict FIFO logic. – Black_Zero Mar 14 '16 at 17:44
  • let me post some part of my code that will clear the requirement I am after. – Black_Zero Mar 14 '16 at 17:46
  • So, what you are doing is handling out-of-order mutiplexed messages, yes? – Martin James Mar 14 '16 at 17:46
  • that's partially right Martin. My incoming messages may or may not be out of order but the processing should be in order. – Black_Zero Mar 14 '16 at 17:56
  • Looking at your latest edit, I don't see what you need the buffers for. Aren't you effectively processing each datagram more or less immediately such that no buffer will ever have a size larger than 1? – 5gon12eder Mar 14 '16 at 17:57
  • @5gon12eder 1: what if I receive three messages from the same stream one after other? – Black_Zero Mar 14 '16 at 18:01
  • IMHO, OP needs the buffers to store messages that arrive out-of-order untill all earlier messages have turned up and the lot can then be processed. – Martin James Mar 14 '16 at 18:01
  • If in this case, you only want to process a single one during the current iteration, then yes, you'll need a buffer. But think what will happen if you receive more than one datagram per destination and iteration on average. – 5gon12eder Mar 14 '16 at 18:06
  • @5gon12eder: can you please put your question in simple English. I am afraid its not my native language. – Black_Zero Mar 14 '16 at 18:19
  • What I wanted to say is: Yes, you'll need a buffer if this is what you really want. But be aware that you might loose a lot of messages if you only ever process at most one per iteration but might receive more than one. – 5gon12eder Mar 14 '16 at 18:23
  • I will process multiple messages in one iteration with the restriction that it should be from different streams. As I said before, if I receive 3 consecutive messages from a single stream they have to go through a FIFO in order to be processed serially. – Black_Zero Mar 14 '16 at 18:44
  • Do you care about the order of messages in a specific stream? The other end sending them in the correct order does not guarantee they are received in that some order. For example, the other host might send nine datagrams in order `A1 B1 C1 A2 B2 C2 A3 B3 C3`, but you could receive them in order `B1 A2 C1 C2 A1 A3 C3 B3 B2`. It is not common, but it does happen. The question is, do you need to enforce order? To re-dispatch `A1 B1 C1`, `A2 B2 C2`, `A3 B3 C3`? Or are you happy to forward `B1 A2 C1`, `C2 A1`, `A3 C3 B3`, `B2`? What are the **exact** dispatch rules? – Nominal Animal Mar 15 '16 at 18:16
  • I think I should rephrase and expand my question a bit more to make it more concievable. – Black_Zero Mar 16 '16 at 09:48
  • Yes, Black_Zero; I for one can certainly understand your question much better now. I'd be very happy to rewrite my answer (I do believe I have something that will help), but I do have one question left: since you only have one thread, wouldn't it be simpler and better to process the messages in round-robin manner, so that during one round, one packet of each stream type is processed, one after another? It is easy to implement, and you can ensure the stream of the first message in a round is different than the stream of the last message. No wool needed.. – Nominal Animal Mar 16 '16 at 20:53
  • Furthermore, you could easily allow the streams a configurable ±N progress difference, while not allowing more than one consecutive message from each stream. This would be a type of relaxed round-robin processing. – Nominal Animal Mar 16 '16 at 20:55
  • Interestingly, I have started my implementation from yesterday with a sort of round-robin design by assming that the number of packets I am getting from streams are in a sequential order and that in every iteration I will get one and only one frame from each stream to process. I will then cater for anomalies in later phases if any arises. As for wool, I have to use it becaue that's what the whole thing is about. To observe the power of ITP over explicit PThreads. Anyhow, any suggestions for improvement of design is welcome. – Black_Zero Mar 17 '16 at 10:36
  • I rewrote my answer, but it basically boils down to using an unordered buffer for the UDP messages; you'll almost certainly also need a (cyclic) buffer for each decoded/decompressed audio stream. The reason I originally suggested threads was more about current devices having multiple cores, than parallelism making any of this easier. In fact, if only a single CPU core is available, I don't see any benefit parallelism would have.. But, perhaps I just lack the imagination. In any case, I hope you find my suggestion useful, even if just as a comparison point. – Nominal Animal Mar 17 '16 at 21:01

1 Answers1

2

(This is a complete rewrite after OP clarified the question with additional details.)

I would suggest using an unordered buffer for the received datagrams, with a stream identifier, stream counter, and receive counter for each; and a latest dispatched counter for each stream:

#define _GNU_SOURCE
#include <stdlib.h>
#include <unistd.h>
#include <limits.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

/* Maximum number of buffered datagrams */
#define  MAX_DATAGRAMS  16

/* Maximum size of each datagram */
#define  MAX_DATAGRAM_SIZE  4096

/* Maximum number of streams */
#define  MAX_STREAMS  4

typedef struct {
    int             stream;                     /* -1 for none */
    unsigned int    counter;                    /* Per stream counter */
    unsigned int    order;                      /* Global counter */
    unsigned int    size;                       /* Bytes of data in data[] */
    char            data[MAX_DATAGRAM_SIZE];
} datagram;


void process(const int socketfd)
{
    /* Per-stream counters for latest dispatched message */
    unsigned int newest_dispatched[MAX_STREAMS] = { 0U };

    /* Packet buffer */
    datagram     buffer[MAX_DATAGRAMS];

    /* Sender IPv4 addresses */
    struct sockaddr_in  from[MAX_DATAGRAMS];

    /* Vectors to refer to packet buffer .data member */
    struct iovec        iov[MAX_DATAGRAMS];

    /* Message headers */
    struct mmsghdr      hdr[MAX_DATAGRAMS];

    /* Buffer index; hdr[i], iov[i], from[i] all refer
     * to buffer[buf[i]]. */
    unsigned int        buf[MAX_DATAGRAMS];

    /* Temporary array indicating which buffer contains
     * the next datagram to be dispatched for each stream */
    int                 next[MAX_STREAMS];

    /* Receive counter (not stream specific) */
    unsigned int        order = 0U;

    int                 i, n;

    /* Mark all buffers unused. */
    for (i = 0; i < MAX_DATAGRAMS; i++) {
        buffer[i].stream = -1;
        buffer[i].size = 0U;
    }

    /* Clear stream dispatch counters. */
    for (i = 0; i < MAX_STREAMS; i++)
        newest_dispatched[i] = 0U;

    while (1) {

        /* Discard datagrams received too much out of order. */
        for (i = 0; i < MAX_DATAGRAMS; i++)
            if (buffer[i].stream >= 0)
                if (buffer[i].counter - newest_dispatched[buffer[i].stream] >= UINT_MAX/2) {
                    /* Either in the past, or too far into the future */
                    buffer[i].stream = -1;
                    buffer[i].size = 0U;
                }

        /* Prepare for receiving new messages.
         * Stream -1 indicates unused/processed message. */
        for (n = 0, i = 0; i < MAX_DATAGRAMS; i++)
            if (buffer[i].stream == -1) {

                /* Prep the buffer. */
                buffer[i].stream = -1;
                buffer[i].counter = 0U;
                buffer[i].order = order + n;
                buffer[i].size = 0U;

                /* Local index n refers to buffer i. */
                buf[n] = i;

                /* Local index n refers to buffer i data. */
                iov[n].iov_base = buffer[i].data;
                iov[n].iov_len  = sizeof buffer[i].data;

                /* Clear received bytes counter. */
                hdr[n].msg_len = 0U;

                /* Source address to from[] array. */
                hdr[n].msg_hdr.msg_name = from + i;
                hdr[n].msg_hdr.msg_namelen = sizeof from[i];

                /* Payload per iov[n]. */
                hdr[n].msg_hdr.msg_iov = iov + n;
                hdr[n].msg_hdr.msg_iovlen = 1;

                /* No ancillary data. */
                hdr[n].msg_hdr.msg_control = NULL;
                hdr[n].msg_hdr.msg_controllen = 0;

                /* Clear received message flags */
                hdr[n].msg_hdr.msg_flags = 0;

                /* Prepared one more hdr[], from[], iov[], buf[]. */
                n++;
            }

        if (n < 1) {
            /* Buffer is full. Find oldest received datagram. */
            unsigned int max_age = 0U;
            int          oldest = -1;

            for (i = 0; i < MAX_DATAGRAMS; i++) {
                const unsigned int age = order - buffer[i].order;
                if (age >= max_age) {
                    max_age = age;
                    oldest = i;
                }
            }

            /* TODO: Dispatch the oldest received datagram:
             * Stream  buffer[oldest].stream
             * Data    buffer[oldest].data, buffer[oldest].size bytes
            */

            /* Update stream counters. */
            newest_dispatched[buffer[oldest].stream] = buffer[oldest].counter;             

            /* Remove buffer. */
            buffer[oldest].stream = -1;
            buffer[oldest].size   =  0;

            /* Need more datagrams. */
            continue;
        }

        n = recvmmsg(socketfd, hdr, n, 0, NULL);
        if (n < 1) {
            /* TODO: Check for errors. */
            continue;
        }

        /* Update buffer description for each received message. */
        for (i = 0; i < n; i++) {
            const int b = buf[i];

            buffer[b].order = order;          /* Already set, actually */
            buffer[b].size  = hdr[i].msg_len;

            /* TODO: determine stream and counter,
             *       based on from[i] and buffer[b].data.
             *       This assigns them in round-robin fashion. */
            buffer[b].stream  = order % MAX_STREAMS;
            buffer[b].counter = order / MAX_STREAMS;

            /* Account for the message received. */
            order++;
        }

    while (1) {

            /* Clear next-to-be-dispatched index list. */
            for (i = 0; i < MAX_STREAMS; i++)
                next[i] = -1;

            /* Find next messages to be dispatched. */
            for (i = 0; i < MAX_DATAGRAMS; i++)
                if (buffer[i].stream >= 0 && buffer[i].counter == newest_dispatched[buffer[i].stream] + 1U)
                    next[buffer[i].stream] = i;

            /* Note: This is one point where you will wish to 
             *       ensure all pending dispatches are complete,
             *       before issuing new ones. */

            /* Count (n) and dispatch the messages. */
            for (n = 0, i = 0; i < MAX_STREAMS; i++)
                if (next[i] != -1) {
                    const int b = next[i];
                    const int s = buffer[b].stream;

                    /* TODO: Dispatch buffer b, stream s. */

                    /* Update dispatch counters. */
                    newest_dispatched[s]++;
                    n++;
                }

            /* Nothing dispatched? */
            if (n < 1)
                break;

            /* Remove dispatched messages from the buffer. Also remove duplicates. */
            for (i = 0; i < MAX_DATAGRAMS; i++)
                if (buffer[i].stream >= 0 && buffer[i].counter == newest_dispatched[buffer[i].stream]) {
                    buffer[i].stream = -1;
                    buffer[i].size = 0U;
                }

        }
    }
}

Note that I omitted the points where you should wait for dispatched messages to complete (as there are multiple options, depending on how you dispatch and whether you wish to do "work" at the same time). Also, this code is only compile-tested, so it might contain logic bugs.

The loop structure is as follows:

  1. Discard buffered messages that are either in the past, or too far in the future, to be useful.

    The counters are cyclic. I added a description of the counter wrapping logic here.

  2. Construct headers for recvmmsg() for each free buffer slot.

  3. If there are no buffer slots free, find and dispatch or discard the oldest one, and repeat from step 1.

  4. Receive one or more messages.

  5. Based on the received messages, update the buffer slots.

    The main point is to determine the stream, the stream counter, and the number of bytes in the received message.

  6. Dispatch loop.

    This is a loop because if we receive messages out of order, but complete them later, we will need to dispatch more than one set of messages at a time.

    Within the loop, the stream index array (next[]) is cleared first.

    Then, we check the buffer for messages that are to be dispatched next. For this, we need the per-stream counters. This is done in a separate steps, in case we ever receive duplicate UDP datagrams.

    If none of the streams have their next message buffered already, we exit this loop, and wait for new datagrams to arrive.

    The messages are dispatched next. The loop dispatches at most one message for each stream.

    After the dispatch, we remove the dispatched messages. Instead of looping over each stream and removing the buffer corresponding to that one, we loop over the entire buffer, so that we catch duplicated UDP messages, too.

Note that buffers are not copied in the above sequence at all.

If the messages are compressed or uncompressed audio, you do need additional (cyclic) buffers for the uncompressed audio streams. Having a shared unordered buffer for all UDP messages has the benefit that you can always pick which audio stream to advance next (if you have received that datagram), and not accidentally spend so much time advancing one stream that other streams might run out of data, causing an audio glitch.

The size of the cyclic buffer for each audio stream should be at least three times the maximum size of a datagram. This lets you use wrapping logic with (((later % LIMIT) + LIMIT - (earlier % LIMIT)) % LIMIT, with result > LIMIT/2 indicating inverse order) for each sample, and append new data even during playback/decompression. (Dispatcher updates one index, audio playback the other. Just make sure they're accessed atomically.) Larger audio stream buffers may cause larger latencies.

In summary, assuming audio stream demuxing and dispatching is at hand, there are two completely separate buffer structures to be used. For UDP datagrams, an unordered set of buffer slots is used. The buffer slots need a bit of bookkeeping (as shown in above code), but it is pretty simple to dispatch them in order for a number of different streams. Each audio stream does require a cyclic buffer (at least three times the maximum size of a (decompressed) datagram), though.

Unfortunately, I don't see any benefit of using independent task parallelism here (e.g. the wool C library).

In fact, it might be even simpler to add a structure per stream to describe the decompressor state, and prioritize them according to which cyclic audio buffer has the least buffered data left. Typical decompressors report if they need more data, so adding a temporary work area per stream (two compressed datagrams), would allow the decompressor to consume whole packets, but copy memory only when absolutely necessary.


Edited to add details about the circular buffers:

There are two main methods of tracking the state of a circular buffer, plus a third, derivative method I suspect might be useful here:

  1. Using separate indexes for adding (head) and removing (tail) data

    If there is one producer and one consumer, the circular buffer can be maintained locklessly, as the producer only increments head, and the consumer increments tail.

    The buffer is empty when head == tail. If the buffer has SIZE entries, head = head % SIZE, and tail = tail % SIZE, then there are (head + SIZE - tail) % SIZE buffered entries.

    The downside is that a simple implementation will always have at least one free entry in the buffer, because the above simple modular arithmetic cannot distinguish between all and none entries used. There are workarounds at a slightly more complex code.

    In the simple case, the buffer has SIZE - 1 - (head + SIZE - tail) % SIZE free entries.

  2. The buffered data begins at index start, and with length entries buffered.

    The buffer contents are always either consecutive in memory, or split into two parts in memory (with first part ending at the end of buffer space, and the second part starting at the start of the buffer space). Producers and consumers need to modify both start and length, so lockless usage requires a compare-and-swap atomic operation (and usually packing both into one integer).

    At any point, there are length entries used, and size - length entries free in the circular buffer.

    When a producer appends n data entries, it copies the data starting at index (start + length) % SIZE, final at index (start + length + n - 1) % SIZE, and increments length by n. As mentioned, the data to be copied might be consecutive, or split in two parts.

    When a consumer consumes n data entries, it copies the data starting at index start, final entry at index (start + n) % SIZE, and updates start = (start + n) % SIZE; and length = length - n;. Again, the consumed data might be split into two parts in memory (if it would otherwise span the end of the buffer).

  3. The derivatives.

    If there are only one producer thread/task, and one consumer, we can double the buffer state variables to allow data to be added or consumed from the buffer asynchronously, via DMA or async I/O.

    1. Using head, tail, head_pending, and tail_pending indexes

      When head != head_pending, the data from head to head_pending-1, inclusive, is being consumed. At completion, the consumer sets head = head_pending % SIZE.

      When tail != tail_pending, there is more data being added at index tail to tail_pending-1, inclusive. When the transfer completes, the producer sets tail = tail_pending % SIZE.

      Note that when using DMA, it is usually best to work with consecutive chunks in memory. In microcontrollers, it is common to use an interrupt to load the next DMA'ble chunk into the DMA registers, in which case you actually have head, head_pending, and head_next, or tail, tail_pending, and tail_next, with the size of each DMA'd chunk chosen so that you do not end up DMA'ing very short chunks near the split point (at physical end of the buffer), but keeping the interrupt rate acceptable.

      At any point, there are (head + SIZE - tail) % SIZE entries present in the buffer that can be consumed. Using simple modular arithmetic at least one entry in the buffer is always unused, so the maximum number of entries that can be added is SIZE - 1 - (head + SIZE - tail) % SIZE.

    2. Using start, length, incoming, and outgoing

      Here, start and length must be modified atomically, so that the other party will not be able to observe old start with new length or vice versa. This can be done locklessly as mentioned above, but care must be taken, as this is a common source of problems.

      At any point, the buffer contains length entries, with incoming entries being added (at (start + length) % SIZE to (start + length + incoming - 1) % SIZE, inclusive, if incoming > 0), and outgoing entries being consumed (at start to (start + outgoing - 1) % SIZE, inclusive, if outgoing > 0).

      When an incoming transfer completes, the producer increments length by incoming.

      When an outgoing transfer completes, the consumer updates start = (start + outgoing) % SIZE and length = length - outgoing.

As to the atomic handling:

C compilers that support C11 provide a family of atomic functions that can be used to update the above variables atomically. Using the weak version allows maximum compatibility across different types of hardware. For start and length:

    uint64_t buffer_state; /* Actual atomic variable */
    uint64_t old_state;    /* Temporary variable */

    temp_state = atomic_load(&buffer_state);
    do {
        uint32_t start = temp_state >> 32;
        uint32_t length = (uint32_t)temp_state;
        uint64_t new_state;

        /* Update start and length as necessary */

        new_state = (uint64_t)length | ((uint64_t)state << 32);
    } while (!atomic_compare_exchange_weak(&buffer_state, &old_state, new_state));

For incrementing some buffer state variable state by amount, with buffer size being size, assuming all are of type size_t:

    size_t old; /* Temporary variable */

    old = atomic_load(&state) % size;
    do {
        size_t new = (old + amount) % size;
    } while (!atomic_compare_exchange_weak(&state, &old, new));

Note that if the atomic_compare_exchange_weak() fails, it will copy the current value of state to old. That is why only one initial atomic load is needed.

Many C compilers provide pre-C11 atomic built-ins that are not standard, just common extensions many C compilers do provide. For example, start and length can be modified atomically using

    uint64_t buffer_state;         /* Actual atomic variable */
    uint64_t old_state, new_state; /* Temporary variables */

    do {
        uint32_t start, length;

        old_state = buffer_state; /* Non-atomic access */

        start = old_state >> 32;
        length = (uint32_t)old_state;

        /* Update start and/or length */

        new_state = (uint64_t)length | ((uint64_t)start << 32);
    } while (!__sync_bool_compare_and_swap(&buffer_state, old_state, new_state));

To increment some buffer state variable state by amount on many pre-C11 compilers, with buffer size being size, assuming all are of type size_t, you can use:

    size_t old_state, new_state; /* Temporary variables */

    do {
        old_state = state;
        new_state = (old_state + amount) % size;
    } while (!__sync_bool_compare_and_swap(&state, old_state, new_state));

All these atomics essentially spin until a modification succeeds atomically. While it would seem like two or more concurrent cores could fight endlessly, current cache architectures are such that one core will always win (first). So, in practice, as long as each core has some other work to do between executing one of such atomic update loops, these will work just fine. (And are, indeed, ubiquitous in lockless C code.)

The last part I'd like to mention is allowing partial dispatch of datagrams. This basically means that each datagram buffer slot has not just size (indicating the number of bytes in that slot), but also start. When a new datagram is received, start is set to zero. If a datagram cannot be dispatched (copied to per-stream buffer) completely, the buffer slot start and size are updated, but the stream dispatch counter is not incremented. That way, on the next round, the rest of this datagram is buffered.

I could write a complete example showing how I'd decompress incoming datagrams from an unordered datagram buffer into several streams, using the partial buffered datagram scheme I mentioned shortly in the previous paragraph, but the exact implementation depends heavily on the programming interface the decompressor library has.

In particular, I personally prefer the interface used in e.g. POSIX iconv() function -- but perhaps returning a status code instead of the number of characters converted. Various audio and speech libraries have different interfaces, and it may even be impossible to convert them to such interface. (As an example from a different arena, most SSL/TLS libraries for protecting socket communications do not have such an interface, as they always expect direct access to the socket descriptor directly; that makes single-threaded multiple-socket asynchronous SSL/TLS implementation "difficult". Well, more like "write from scratch if you want it".) For audio analysis of decompressed data, say using FFTW library for Fast Fourier transform (or DCT, or Hartley, or one of the other transforms that excellent library performs, especially when optimized wisdom for that transform at that window size is available), the decompressed data is typically needed in fixed-size chunks. That too would affect the exact implementation.

Community
  • 1
  • 1
Nominal Animal
  • 38,216
  • 5
  • 59
  • 86
  • I have explicitly mentioned in the question that I have only one thread that is UDP which does the reception AND dispatching. Your whole solution is based on the premise of a "separate" processing thread, which I am not at a liberty to use in my design. – Black_Zero Mar 15 '16 at 08:43
  • Question might arise "who then does the processing". Its the indpendent TASKS who does the processing. UDP dispatches these TASKS with the required data to process and it MUST not dispatch two successive speech frames contained in two consecutive datagrams from the same stream. – Black_Zero Mar 15 '16 at 08:45
  • UDP however MAY receive two consecutive datagrams (thus consecutive speech frames) from the same stream in one recvmmsg call. How then they should be processed in a FIFO manner, is the solution I am after. – Black_Zero Mar 15 '16 at 08:50
  • I have placed similar question here on SO involving the whole processing chain. Question remained here for almost 3 weeks with no answer so I deleted it and broke the question in pieces to make it understandable for the experts. But seems that does not work either. – Black_Zero Mar 16 '16 at 09:46