1

as some might have noticed I'm trying to implement a ring buffer. I want to have a certain amount of safety measures in the data structure while not losing too much efficiency.

My current solution implements two types of counters, an index, which is a direct offset into the buffers memory and a sequence number, which is just a counter of type size_t. The sequence number is used by iterators to access the ring buffer. Therefore the ring buffer has to convert from sequence number to buffer index on every buffer access. This is usually rather efficient:

size_t offset = seqNum - m_tailSeq;
size_t index = (m_tailIdx + offset) % m_size;

where seqNum is the sequence number to be converted, m_tailSeq is the sequence number of the oldest element in the buffer, m_tailIdx is the buffer index of the oldest element in the buffer and m_size is the size of the buffer memory.

However, if I keep adding elements to the buffer long enough, the sequence numbers will overflow. So I have to check for this. And when I do that my short and sweet conversion turns into this monster:

size_type getIndex(size_type seqNum) const
{
    size_type headSeq = m_tailSeq + m_numElements;

    // sequence does not wrap around
    if (m_tailSeq < headSeq)
    {
        // bounds check
        if(m_tailSeq <= seqNum && seqNum < headSeq) {
            size_type offset = seqNum - m_tailSeq;
            return (m_tailIdx + offset) % m_size;
        } else {
            throw BaseException("RingBuffer: access out of bounds", __FILE__, __LINE__);
        }
    }
    // sequence does wrap around
    else if (headSeq < m_tailSeq)
    {
        //bounds check (inverted from above)
        if(seqNum < headSeq) {
            size_type offset = (SIZE_TYPE_MAX - m_tailSeq) + seqNum;
            return (m_tailIdx + offset) % m_size;
        } else if (seqNum >= m_tailSeq) {
            size_type offset = seqNum - m_tailSeq;
            return (m_tailIdx + offset) % m_size;
        } else {
            throw BaseException("RingBuffer: access out of bounds", __FILE__, __LINE__);
        }
    }
    else if (isEmpty()) {
        throw BaseException("RingBufferIterator: accessing empty buffer", __FILE__, __LINE__);
    }
}

This amounts to two integer additions, one integer subtraction, three integer comparisons, and one modulo operation in the best case on ever single buffer access. Needless to say that iterating over the buffer becomes pretty expensive. However, since I want to use this buffer in high performance scenarios (i.e. event queue in soft real time application) I would like this data structure to be as efficient as possible.

The current use case would be as an event buffer. One (or possibly more than one) system would write events into the buffer and other systems (more than one) would process these events at their own pace without removing them. When the buffer is full old events are simply overwritten. This way I always have a record of the last few hundred events and different systems can go over them at their respective update rates and pick out the events that are relevant for them. The different systems will keep an iterator that points into the ring buffer so they know where they left off last time and where to resume. When a system starts processing events it needs to determine whether its iterator is still valid or whether it has been overwritten. Events are likely to be processed in big chunks at a time, so incrementing and dereferencing should be quick. So basically we're looking at an MPMC ring buffer in a potentially multithreaded context.

The only solution I can come up with myself is to move the burden of error checking to the user of the buffer. I.e. the user has to first check (by some means) if its iterator into the buffer is valid, make sure a certain stretch of the buffer stays valid and then iterate over this stretch without any further checks. However, this seems error prone since I have to check the safety of the access in multiple parts of the program instead of just one place and it will get hairy if I should ever decide to make the buffer thread safe.

Am I missing something? Can this be done any better? Am I committing some beginners mistake?

MadMonkey
  • 609
  • 1
  • 6
  • 13
  • 2
    Put your bounds check under defines which can be enabled/disabled depending on a debug mode or not. For example, only vector::at guarantees bounds checking. But many compilers bounds-check vector::operator[] in debug mode. – Neil Kirk Oct 17 '14 at 16:07
  • Take a look at sequence numbers in network protocols. Overflowing of unsigned integers well defined in C++. – magras Oct 17 '14 at 16:29
  • You probably don't want to use exceptions. Also, branch instructions can be more expensive than just the instruction. Division/modulus can also be expensive. The best recommendation is probably to do accurate benchmarking. You may want to take a look at an open source implementation of a [ring buffer](https://github.com/LMAX-Exchange/disruptor) to see different ways to implement them. – Jason Oct 17 '14 at 20:28
  • @Jason, I know that branches can be very costly but I haven't yet found a way to do without them. If you have any hints I would be happy to hear them. I have also looked at several ring buffer implementations but not the you mentioned as I have restricted myself to C++. I'll read it though. Most implementations I looked at simply don't do bounds checking. I'm seriously starting to doubt if there is an efficient way to implement it this way or if I have to simply trust that the user will do proper bounds checking... – MadMonkey Oct 17 '14 at 20:51
  • @sseeland There's no cost free way to do bounds checks, other than disallowing user indexing by design (most ring-buffers I'm aware of don't). There are micro-optimizations to avoid division/modulus overhead as well, but you should really worry about using exceptions first. – Jason Oct 17 '14 at 21:20
  • @Jason First of all I'm not convinced that exceptions are such a great cost if they are not triggered. And secondly I wouldn't know how to do proper error reporting without them. But that's not the topic here. I'm just wondering if there is a more efficient way to make sure the clients don't get wrong data. I'll edit my question to clarify the use cases of the buffer, maybe that will help. – MadMonkey Oct 17 '14 at 21:25
  • @sseeland I think you might be optimizing a bit prematurely. However, an easy change would be altering your sequence numbers to be monotonic (64 bit unsigned values will effectively never overflow). That simplifies your *stale* check. There are tricks to avoiding division/modulus as well, but honestly there are other issues with the implementation I'd be more concerned with before being able to justify the optimizations. – Jason Oct 17 '14 at 21:55
  • If you are designing this for a multi-threaded environment, you'll need to add more precautions that will likely include significantly more overhead than your bounds checking. For the latter, I regret I cannot + *n* with *n* > 1 @NeilKirk's comment which suggests what I would do in this situation, too. – 5gon12eder Oct 18 '14 at 12:05

1 Answers1

2

As I mention in comment unsigned integer overflow is well defined operation. It's key point to implement efficient sequence numbers in C++. So we can simply subtract two unsigned integers to get the distance. Then just forward distance to function that implements access by index with boundary checks. As always it will work while all possible indexes lower than half of sequence number max value.

#include <array>
#include <climits>
#include <iostream>

unsigned int const SEQUENCE_NUMBER_FIRST = UINT_MAX-10;

class RingBuffer
{
public:
    void PushBack( char c )
    {
        GetBySeqNumber(m_tailSeq++) = c;
        if( Size() == m_buffer.size()+1 )
            PopFront();
    }
    void PopFront()
    {
        ++m_headSeq;
        if( ++m_offset % m_buffer.size() == 0 )
            m_offset = 0;
    }
    char& GetByIndex( size_t n )
    {
        if( n >= Size() )
            throw std::out_of_range("Hello, world!");
        return m_buffer[ (n+m_offset) % m_buffer.size() ];
    }
    char& GetBySeqNumber( unsigned int n )
    {
        // It is well defined operation in C++,
        // but if you try to use signed integer
        // it will become undefined behavior
        return GetByIndex( n-m_headSeq );
    }
    size_t Size() const
    {
        return m_tailSeq - m_headSeq;
    }
private:
    size_t m_offset = 0;
    unsigned int m_headSeq = SEQUENCE_NUMBER_FIRST;
    unsigned int m_tailSeq = SEQUENCE_NUMBER_FIRST;
    std::array<char,26> m_buffer;
};

int main()
{
    // initialize
    RingBuffer buf;
    for( char i=0; i<26; ++i )
        buf.PushBack( 'a'+i );

    // access trough sequence numbers
    // add or subtract one to get out of range exception
    for( unsigned int i=0; i<buf.Size(); ++i )
        std::cout << buf.GetBySeqNumber( SEQUENCE_NUMBER_FIRST+i );
    std::cout << std::endl;

    // push some more to overwrite first 10 values
    for( char i=0; i<10; ++i )
        buf.PushBack( '0'+i );

    // access trough indexes
    // add or subtract one to get out of range exception
    for( size_t i=0; i<buf.Size(); ++i )
        std::cout << buf.GetByIndex(i);
    std::cout << std::endl;

    return 0;
}
Community
  • 1
  • 1
magras
  • 1,709
  • 21
  • 32
  • I'm afraid I can't see how this can work. It works only when `head_` refers to `buffer_[0] `, as far as I can see. But the idea of a ring buffer is that the data can basically start anywhere in the array and just wraps around. And if you keep adding data, the "beginning" of the data just moves. So how can your code handle situation where the valid data starts at `buffer_[5]` and wraps around until `buffer_[4]`? – MadMonkey Oct 18 '14 at 15:52
  • You didn't get my point. Of course you needed to insert proper implementation with range checks to `GetByIndex()`. So I supplied full version of ring buffer. – magras Oct 18 '14 at 17:33
  • Wow, it took me a long time to see it but now I know what you mean! This works brilliantly! – MadMonkey Oct 18 '14 at 22:30