2

I am looking for something similar to SHM (SHared Memory) SPSC queue setup offered by boost::lockfree::spsc_queue and boost::interprocess but without allocating strings and storing them flat i.e. next to each other for maximum efficiency.

If I understand correctly that setup stores strings offset in the queue and allocates memory for the string somewhere else in the SHM.

Queue design can be:

| size | string 1 | size | string 2 | size | string 3 | ...
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                    SHM segment

in a circular buffer fashion. Idea:

struct Writer {
    std::byte *shm;

    void write(std::string_view str) {
        // write size
        const uint32_t sz = str.size();
        std::memcpy(shm, &sz, sizeof(sz));
        shm += sizeof(sz);

        // write string
        std::memcpy(shm, str.data(), sz);
        shm += sz;

    }
};
madhur4127
  • 276
  • 2
  • 13
  • With variable-length reservations in an array as a circular buffer? That might be doable for the SPSC case with each entry having a length. (Keeping C++ happy with reuse of memory for different objects might be inconvenient, but hopefully placement-new is fine. In assembly there's no problem, so it's just a matter of getting compilers to do the right thing). Or is there a fixed length that you can pad every string to, like buckets of `struct buffer_entry { char buffer[32]; }` or `char buffer[64][n_entries];`? – Peter Cordes Dec 25 '22 at 07:30
  • If your strings are short (< 15 bytes), an array of sttd::string would have the arragment you describe. If your strings are longer, you'd need to write you own string objects. You can't change the way std::string stores strings, only its allocators. – Michaël Roy Dec 25 '22 at 07:32
  • @MichaëlRoy string size is > SSO. – madhur4127 Dec 25 '22 at 07:38
  • @madhur4127: Are you willing to use a fixed amount of storage per string? For example 120 bytes per string regardless of how long it really is. – John Zwinck Dec 25 '22 at 07:44
  • @JohnZwinck i really care about latency, anything that's optimal is good enough. I am eager to hear your idea! – madhur4127 Dec 25 '22 at 07:46
  • If string size is large, you should make your own string class. You cannot change the internal workings of std::string, it will always store a pointer to somewhere else next to the length. But with strings so long, what exactly is your potential gain in latency? I'd say it is a bit less than one line load time from L3 ? And if there is only one more hiccup during the other 8 lines load, any gain instantly disappear. John's solution below is a good place to start and measure performance. Let me know how it goes, The most important will be the order in which the strings are stored/read. – Michaël Roy Dec 25 '22 at 08:52
  • @MichaëlRoy gain in performance is not having to deal with allocations and deallocations. The algorithms aren't really cheap and adds extra latency whereas this design is very simple and fast. Reader can allocate a big enough buffer and memcpy from the SHM into its own memory to process it. Optimal memory usage and least latency. – madhur4127 Dec 25 '22 at 09:06
  • 2
    @madhur4127 Memory latency performance gains are in where the 64 bytes lines of memory you need are located when you need them. If you only access the string (read and/or wrrite) in you SHM uniquely in sequential order, then the gains will be amazing,, if it's random access, any actual gains will be more random. – Michaël Roy Dec 25 '22 at 09:11
  • TL;DR: I don't know enough about the context to properly answer. Ok, I spent a few hours thinking about the problem. But now that I read your comment "Optimal memory usage and least latency" the best solution I can think of is protocol buffers. In your design there is a total lack of alignment. Isn't that optimizing for throughput? Are you optimizing for Round Trip Time or is this a Simplex network kinda deal? I can't really help without knowing the limitations of the hardware reading the shared memory. Each string can be up to 4GB long. What does your benchmark say about that? – viraltaco_ Jan 03 '23 at 06:44
  • @viraltaco_ lack of alignment is easy to deal with though, although on x86 it doesn't matter. I want to optimize for the latency from write to read. That means producer and consumer should both be quick. I don't think string size actually matter because the SHM is of a fixed size and messages will be less than that. Although my use case is 8MB SHM with <4KB strings. – madhur4127 Jan 03 '23 at 06:57
  • "although on x86 it doesn't matter" In my experience nothing could be further from the truth. To illustrate that I recommend watching (all of/the following) [Chandler Carruth Talk on this very topic](https://youtu.be/2EWejmkKlxs). "I don't think string size actually matter because the SHM is of a fixed size" That's my point. The size doesn't really matter. If you have 4KB strings then a fixed sized slice of the SHM "pool" could be faster (should be benchmarked). – viraltaco_ Jan 03 '23 at 20:02

2 Answers2

2

tl;dr

It is possible, although you'll have to deal with a bunch of extra edge-cases.

  • Here's a working godbolt of a spsc queue that fulfils your requirements.
    (see bottom of this post in case the link goes bad)
  • Please also check 2. Potential alternatives for viable alternatives.

1. Assumptions

Given the information you provided i'm going to assume you want the following properties for the single-producer, single-consumer (spsc) queue:

  • It should be backed by a ring buffer (that might be located within a shared memory region)
    (this is also the case for boost::lockfree::spsc_queue)
  • Elements should be returned in FIFO order
    (like boost::lockfree::spsc_queue)
  • The string-elements should be stored entirely within the ring buffer (no additional allocations)
  • The queue should be wait-free
    (like boost::lockfree::spsc_queue, but not like most stuff in boost::interprocess (e.g. boost::interprocess::message_queue utilizes mutexes))

2. Potential alternatives

Depending on your requirements there could be a few alternative options:

  • Fixed-length strings:
    Like @John Zwinck suggested in his answer you could use a fixed-length string buffer.
    The trade-off would be that your maximum string length is bounded by this size, and - depending on your expected variation of possible string sizes - might result in a lot of unused buffer space.
    If you go this route i'd recommend you to use boost::static_string - it's essentially a std::string with the dynamic allocation stuff removed and solely relying on its internal buffer.
    i.e. boost::lockfree::spsc_queue<boost::static_string<N>>, where N is the maximum size for the string values.

  • Only store pointers in the queue and allocate the strings separately:
    If you're already using boost::interprocess you could use a boost::interprocess::basic_string with a boost::interprocess::allocator that allocates the string separately in the same shared memory region.
    Here's a answer that contains a full example of this approach (it even uses boost::lockfree::spsc_queue) (direct link to code example)
    The trade-off in this case is that the strings will be stored somewhere outside the spsc queue (but still within the same shared memory region).
    If your strings are relatively long this might even be faster than storing the strings directly within the queue (the ring buffer can be alot smaller if it only needs to store pointers, and therefore would have a much better cache-locality).
    (cache locality won't help in this case - see this excellent comment from @Peter Cordes)


3. Design Considerations

3.1 Wrapped-around writes

A ringbuffer with fixed-size elements essentially splits the raw buffer into element-sized slots that are contiguous within the buffer, e.g.:

 /---------------------- buffer ----------------------\
/                                                      \
+----------+----------+----------+----------+----------+
| Object 1 | Object 2 | Object 3 |    ...   | Object N |
+----------+----------+----------+----------+----------+

This automatically ensures that all objects within the buffer are contiguous. i.e. you never have to deal with a wrapped-around object:

 /---------------------- buffer ----------------------\
/                                                      \
+-----+----------+----------+----------+----------+-----+
|ct 1 |          |          |          |          | Obje|
+-----+----------+----------+----------+----------+-----+

If the element-size is not fixed however, we do have to handle this case somehow.
Assume for example an empty 8-byte ringbuffer with the read and write pointer on the 7th byte (the buffer is currently empty):

                                 /--------- read pointer
                                 v
+----+----+----+----+----+----+----+----+
|  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
+----+----+----+----+----+----+----+----+
|    |    |    |    |    |    |    |    |
+----+----+----+----+----+----+----+----+
                                 ^
                                 \---------- write pointer

If we now attempt to write "bar" into the buffer (prefixed by it's length), we would get a wrapped-around string:

                                 /--------- read pointer
                                 v
+----+----+----+----+----+----+----+----+
|  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
+----+----+----+----+----+----+----+----+
|  a |  r |    |    |    |    | 03 |  b |
+----+----+----+----+----+----+----+----+
             ^
             \----------------------------- write pointer

There are 2 ways to deal with this:

  • Let the consumer deal with wrapped-around writes.
  • Don't allow wrapped-around writes; ensure each string is written in a contiguous block within the buffer.
3.1.1 Making it part of the interface

The 1st option would be the easiest to implement, but it would be quite cumbersome to use for the consumer of the queue, because it needs to deal with 2 separate pointers + sizes that in combination represent the string. i.e. a potential interface for this could be:

class spsc_queue {
  // ...

  bool push(const char* str, std::size_t size);

  bool read(
     const char*& str1, std::size_t& size1,
     const char*& str2, std::size_t& size2
  );
  bool pop();

  // ...
};

// Usage example (assuming the state from "bar" example above)
const char* str1;
std::size_t size1;
const char* str2;
std::size_t size2;
if(queue.read(str1, size1, str2, size2)) {
  // str1 would point to buffer[7] ("b")
  // size1 would be 1
  // str2 would point to buffer[0] ("ar")
  // size2 would be 2
  queue.pop();
}

Having to deal with 2 pointers and 2 sizes all the time for the odd case of a wrapped-around write is not the best solution imho, so i went with option 2:

3.1.2 Prevent wrap-arounds

The alternative option would be to prevent wrap-arounds from occuring in the first place.

A simple way to achieve this is to add a special "wrap-around" marker that tells the reader to immediately jump back to the beginning of the buffer (and therefore skip over all bytes after the wrap-around marker)

Example writing "bar": (WA represents a wrap-around marker)

                                 /--------- read pointer
                                 v
+----+----+----+----+----+----+----+----+
|  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
+----+----+----+----+----+----+----+----+
| 03 |  b |  a |  r |    |    | WA |    |
+----+----+----+----+----+----+----+----+
                       ^
                       \------------------- write pointer

So once the reader tries to read the next element it'll encounter the wrap-around marker. This instructs it to directly go back to index 0, where the next element is located:

   /--------------------------------------- read pointer
   v
+----+----+----+----+----+----+----+----+
|  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
+----+----+----+----+----+----+----+----+
| 03 |  b |  a |  r |    |    | WA |    |
+----+----+----+----+----+----+----+----+
                       ^
                       \------------------- write pointer

This technique allows all strings to be stored contiguously within the ring buffer - with the small trade-off that the end of the buffer might not be fully utilized and a couple extra branches in the code.

For this answer i chose the wrap-around marker approach.

3.2 What if there's no space for a wrap-around marker?

Another problem comes up once you want a string-size that's above 255 - at that point the size needs to be larger than 1 byte.

Assume we use a 2 byte-length and write "foo12" (length 5) into the ring buffer:

   /--------------------------------------- read pointer
   v
+----+----+----+----+----+----+----+----+
|  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
+----+----+----+----+----+----+----+----+
| 05 | 00 |  f |  o |  o |  1 |  2 |    |
+----+----+----+----+----+----+----+----+
                                      ^
                                      \---- write pointer

so far so good, but as soon as the read pointer catches up we have a problem:
there is only a single byte left to write before we need to wrap around, which is not enough to fit a 2-byte length!

So we would need to wrap-around the length on the next write (writing "foo" (length 3) into the ringbuffer):

                                      /---- read pointer
                                      v
+----+----+----+----+----+----+----+----+
|  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
+----+----+----+----+----+----+----+----+
| 00 |  f |  o |  o |    |    |    | 03 |
+----+----+----+----+----+----+----+----+
                       ^
                       \------------------- write pointer

There are three potential ways this could be resolved:

  • Deal with wrapped-around lengths in the implementation.
    The downside to this approach is that it introduces a bunch more branches into the code (a simple memcpy for the size won't suffice anymore), it makes aligning strings more difficult and it goes against the design we chose in 3.1.
  • Use a single byte wrap-around marker (regardless of the string size)
    This would allow us to place a wrap-around marker and prevent wrapped-around string sizes:
                                          /---- read pointer
                                          v
    +----+----+----+----+----+----+----+----+
    |  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
    +----+----+----+----+----+----+----+----+
    | 03 | 00 |  f |  o |  o |    |    | WA |
    +----+----+----+----+----+----+----+----+
                                ^
                                \-------------- write pointer
    
    The downside with this approach is that it's rather difficult to differentiate between a wrap-around marker and an actual string-size. We also would need to probe the first byte of each length first to check if it's a wrap-around marker before reading the full length integer.
  • Automatically advance the write pointer if the remaining size is insufficient for a length entry. This is the simplest solution i managed to come up with, and the one i ended up implementing. It basically allows the writer to advance the write pointer back to the beginning of the buffer (without writing anything in it), if the remaining size would be less than the size of the length integral type. (The reader recognizes this and jumps back to the beginning as well)
                                          /---- read pointer
                                          v
    +----+----+----+----+----+----+----+----+
    |  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
    +----+----+----+----+----+----+----+----+
    | 03 | 00 |  f |  o |  o |    |    |    |
    +----+----+----+----+----+----+----+----+
                                ^
                                \-------------- write pointer
    

3.3 Destructive interference

Depending on how fast your reader is in comparison to your writer you might have a bit of destructive interference within the ring buffer.

If for example your L1 cache line size is 128 bytes, using 1-byte lengths for the strings in the ring-buffer, and only pushing length 1 strings, e.g.:

   /--------------------------------------- read pointer
   v
+----+----+----+----+----+----+----+----+
|  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |
+----+----+----+----+----+----+----+----+
| 01 |  a | 01 |  b | 01 |  c |    |    | ...
+----+----+----+----+----+----+----+----+
                                 ^
                                 \--------- write pointer

Then this would result in +/- 64 string entries being stored on the same cache line, which are continously written by the producer while they get read from the consumer => a lot of potential interference.

This can be prevented by padding the strings within the ring buffer to a multiple of your cache line size (in C++ available as std::hardware_destructive_interference_size)

i.e. strings padded to 4-bytes:

   /--------------------------------------- read pointer
   v
+----+----+----+----+----+----+----+----+----+
|  0 |  1 |  2 |  3 |  4 |  5 |  6 |  7 |  8 |
+----+----+----+----+----+----+----+----+----+
| 01 |  a |    |    | 01 |  c |    |    |    | ...
+----+----+----+----+----+----+----+----+----+
                                           ^
                                           \--------- write pointer

The trade-off here is of course that this will potentially waste a lot of space within the ring buffer.

So you'll have to profile how much padding you want for your string values.

The padding value you choose should be between those two values:
1 <= N <= std::hardware_destructive_interference_size

  • Where 1 is basically no padding => best space utilization, worst potential interference
  • And std::hardware_destructive_interference_size=> worst space utilization, no potential interference

4. Implementation

This is a full implementation of a wait-free, string only, spsc fifo queue - based on the design considerations listed above.

I've only implemented the bare minimum interface required, but you can easily create all the utility functions boost::lockfree::spsc_queue provides around those 3 core functions:

godbolt

template<std::unsigned_integral size_type, std::size_t padding = 1>
class lockfree_spsc_string_queue {
public:
    lockfree_spsc_string_queue(void* buffer, std::size_t buffer_size, bool init = false);

    // producer:

    // tries to push the given string into the queue
    // returns true if adding the string was successfull
    // (contents will be copied into the ringbuffer)
    bool push(std::string_view str);

    // consumer:

    // reads the next element in the queue.
    // returns an empty optional if the queue is empty,
    // otherwise a string_view that points to the string
    // within the ringbuffer.
    // Does NOT remove the element from the queue.
    std::optional<std::string_view> front();

    // Removes the next element from the queue.
    // Returns true if an element has been removed.
    bool pop();
};
  • size_type is the integral type that is used to store the length of the strings within the buffer, i.e. if you use unsigned char each string could be at most 254 bytes in length (with unsigned short (assuming 2 bytes) it would be 65534, etc...) (the maximum value is used as a wrap-around marker)
  • padding is the alignment that's used for the string values. If it is set to 1 then strings will be packed as tightly as possible into the ring buffer (best space utilization). If you set it to std::hardware_destructive_interference_size then there will be no interference between different string values in the ring buffer, at the cost of space utilization.

Usage example: godbolt

void producer() {
    lockfree_spsc_string_queue<unsigned short> queue(BUFFER_POINTER, BUFFER_SIZE, true);
    while(true) {
        // retry until successful
        while(!queue.push("foobar"));
    }
}

void consumer() {
    lockfree_spsc_string_queue<unsigned short> queue(BUFFER_POINTER, BUFFER_SIZE);

    while(true) {
        std::optional<std::string_view> result;
        // retry until successful
        while(!result) result = queue.front();

        std::cout << *result << std::endl;

        bool pop_result = queue.pop();
        assert(pop_result);
    }
}

boost::lockfree::spsc_queue::consume_all e.g. could be implemented like this (in terms of the 3 functions provided by this minimal implementation):

    template<class Functor>
    std::size_t consume_all(Functor&& f) {
        std::size_t cnt = 0;
        for(auto el = front(); el; el = front()) {
            std::forward<Functor>(f)(*el);
            bool pop_result = pop();
            assert(pop_result);
            ++cnt;
        }
        return cnt;
    }

Full implementation: godbolt

// wait-free single-producer, single consumer fifo queue
// 
// The constructor must be called once with `init = true` for a specific region.
// After the construction of the queue with `init = true` has succeeded additional instances
// can be created for the region by passing `init = false`.
template<
        std::unsigned_integral size_type,
        std::size_t padding = 1>
class lockfree_spsc_string_queue {
    // we use the max value of size_type as a special marker
    // to indicate that the writer needed to wrap-around early to accommodate a string value.
    // this means the maximum size a string entry can be is `size_type_max - 1`.
    static constexpr size_type size_type_max = std::numeric_limits<size_type>::max();

    // calculates the padding necessary that is required after
    // a T member to align the next member onto the next cache line.
    template<class T>
    static constexpr std::size_t padding_to_next_cache_line =
        std::hardware_destructive_interference_size -
        sizeof(T) % std::hardware_destructive_interference_size;

public:
    // internal struct that will be placed in the shared memory region
    struct spsc_shm_block {
        using atomic_size = std::atomic<std::size_t>;

        // read head
        atomic_size read_offset;
        char pad1[padding_to_next_cache_line<atomic_size>];

        // write head
        atomic_size write_offset;
        char pad2[padding_to_next_cache_line<atomic_size>];

        std::size_t usable_buffer_size;
        char pad3[padding_to_next_cache_line<std::size_t>];

        // actual data
        std::byte buffer[];

        [[nodiscard]] static inline spsc_shm_block* init_shm(void* ptr, std::size_t size) {
            spsc_shm_block* block = open_shm(ptr, size);

            // atomics *must* be lock-free, otherwise they won't work across process boundaries.
            assert(block->read_offset.is_lock_free());
            assert(block->write_offset.is_lock_free());

            block->read_offset = 0;
            block->write_offset = 0;
            block->usable_buffer_size = size - offsetof(spsc_shm_block, buffer);
            return block;
        }

        [[nodiscard]] static inline spsc_shm_block* open_shm(void* ptr, std::size_t size) {
            // this type must be trivially copyable, otherwise we can't implicitly start its lifetime.
            // It also needs to have a standard layout for offsetof.
            static_assert(std::is_trivially_copyable_v<spsc_shm_block>);
            static_assert(std::is_standard_layout_v<spsc_shm_block>);

            // size must be at least as large as the header
            assert(size >= sizeof(spsc_shm_block));
            // ptr must be properly aligned for the header
            assert(reinterpret_cast<std::uintptr_t>(ptr) % alignof(spsc_shm_block) == 0);
        
            // implicitly start lifetime of spsc_shm_block
            return std::launder(reinterpret_cast<spsc_shm_block*>(ptr));
        }
    };

public:
    inline lockfree_spsc_string_queue(void* ptr, std::size_t size, bool init = false)
        : block(init ? spsc_shm_block::init_shm(ptr, size) : spsc_shm_block::open_shm(ptr, size))
    {
        // requires a buffer at least 1 byte larger than size_type
        assert(block->usable_buffer_size > sizeof(size_type));
    }

    // prevent copying / moving
    lockfree_spsc_string_queue(lockfree_spsc_string_queue const&) = delete;
    lockfree_spsc_string_queue(lockfree_spsc_string_queue&&) = delete;
    lockfree_spsc_string_queue& operator=(lockfree_spsc_string_queue const&) = delete;
    lockfree_spsc_string_queue& operator=(lockfree_spsc_string_queue&&) = delete;

    // producer: tries to add `str` to the queue.
    // returns true if the string has been added to the queue.
    [[nodiscard]] inline bool push(std::string_view str) {
        std::size_t write_size = pad_size(sizeof(size_type) + str.size());

        // impossible to satisfy write (not enough space / insufficient size_type)
        if(write_size > max_possible_write_size() || str.size() >= size_type_max) [[unlikely]] {
            assert(write_size < max_possible_write_size());
            assert(str.size() < size_type_max);
            return false;
        }

        std::size_t write_off = block->write_offset.load(std::memory_order_relaxed);
        std::size_t read_off = block->read_offset.load(std::memory_order_acquire);
        
        std::size_t new_write_off = write_off;
        if(try_align_for_push(read_off, new_write_off, write_size)) {
            new_write_off = push_element(new_write_off, write_size, str);
            block->write_offset.store(new_write_off, std::memory_order_release);
            return true;
        }

        if(new_write_off != write_off) {
            block->write_offset.store(new_write_off, std::memory_order_release);
        }

        return false;
    }

    // consumer: discards the current element to be read (if there is one)
    // returns true if an element has been removed, false otherwise.
    [[nodiscard]] inline bool pop() {
        std::size_t read_off;
        std::size_t str_size;
        if(!read_element(read_off, str_size)) {
            return false;
        }

        std::size_t read_size = pad_size(sizeof(size_type) + str_size);
        std::size_t new_read_off = advance_offset(read_off, read_size);
        block->read_offset.store(new_read_off, std::memory_order_release);

        return true;
    }

    // consumer: returns the current element to be read (if there is one)
    // this does not remove the element from the queue.
    [[nodiscard]] inline std::optional<std::string_view> front() {
        std::size_t read_off;
        std::size_t str_size;
        if(!read_element(read_off, str_size)) {
            return std::nullopt;
        }

        // return string_view into buffer
        return std::string_view{
            reinterpret_cast<std::string_view::value_type*>(&block->buffer[read_off + sizeof(size_type)]),
            str_size
        };
    }

private:
    // handles implicit and explicit wrap-around for the writer
    [[nodiscard]] inline bool try_align_for_push(
            std::size_t read_off,
            std::size_t& write_off,
            std::size_t write_size) {
        std::size_t cont_avail = max_avail_contiguous_write_size(write_off, read_off);

        // there is enough contiguous space in the buffer to push the string in one go
        if(write_size <= cont_avail) {
           return true;
        }

        // not enough contiguous space in the buffer.
        // check if the element could fit contiguously into
        // the buffer at the current write_offset.
        std::size_t write_off_to_end = block->usable_buffer_size - write_off;
        if(write_size <= write_off_to_end) {
            // element could fit at current position, but the reader would need
            // to consume more elements first
            // -> do nothing
            return false;
        }

        // element can NOT fit contiguously at current write_offset
        // -> we need a wrap-around
        std::size_t avail = max_avail_write_size(write_off, read_off);

        // not enough space for a wrap-around marker
        // -> implicit wrap-around
        if(write_off_to_end < sizeof(size_type)) {
            // the read marker has advanced far enough
            // that we can perform a wrap-around and try again.
            if(avail >= write_off_to_end) {
                write_off = 0;
                return try_align_for_push(read_off, write_off, write_size);
            }

            // reader must first read more elements
            return false;
        }

        // explicit wrap-around
        if(avail >= write_off_to_end) {
            std::memcpy(&block->buffer[write_off], &size_type_max, sizeof(size_type));
            write_off = 0;
            return try_align_for_push(read_off, write_off, write_size);
        }

        // explicit wrap-around not possible
        // (reader must advance first)
        return false;
    }

    // writes the element into the buffer at the provided offset
    // and calculates new write_offset
    [[nodiscard]] inline std::size_t push_element(
            std::size_t write_off,
            std::size_t write_size,
            std::string_view str) {
        // write size + string into buffer
        size_type size = static_cast<size_type>(str.size());
        std::memcpy(&block->buffer[write_off], &size, sizeof(size_type));
        std::memcpy(&block->buffer[write_off + sizeof(size_type)], str.data(), str.size());

        // calculate new write_offset
        return advance_offset(write_off, write_size);
    }

    // returns true if there is an element that can be read (and sets read_off & str_size)
    // returns false otherwise.
    // internally handles implicit and explicit wrap-around. 
    [[nodiscard]] inline bool read_element(std::size_t& read_off, std::size_t& str_size) {
        std::size_t write_off = block->write_offset.load(std::memory_order_acquire);
        std::size_t orig_read_off = block->read_offset.load(std::memory_order_relaxed);
        read_off = orig_read_off;
        str_size = 0;

        if(read_off == write_off) {
            return false;
        }

        // remaining space would be insufficient for a size_type
        // -> implicit wrap-around
        if(block->usable_buffer_size - read_off < sizeof(size_type)) {
            read_off = 0;
            if(read_off == write_off) {
                block->read_offset.store(read_off, std::memory_order_release);
                return false;
            }
        }

        size_type size;
        std::memcpy(&size, &block->buffer[read_off], sizeof(size_type));

        // wrap-around marker
        // -> explicit wrap-around
        if(size == size_type_max) {
            read_off = 0;
            if(read_off == write_off) {
                block->read_offset.store(read_off, std::memory_order_release);
                return false;
            }
            
            std::memcpy(&size, &block->buffer[read_off], sizeof(size_type));
        }

        // modified read_off -> store
        if(read_off != orig_read_off) {
            block->read_offset.store(read_off, std::memory_order_release);
        }

        str_size = size;
        return true;
    }

    // the maximum number of contiguous bytes we are currently able
    // to fit within the memory block (without wrapping around)
    [[nodiscard]] inline std::size_t max_avail_contiguous_write_size(
            std::size_t write_off,
            std::size_t read_off) {
        if(write_off >= read_off) {
            std::size_t ret = block->usable_buffer_size - write_off;
            ret -= read_off == 0 ?  1 : 0;
            return ret;
        }

        // write_off < read_off
        return read_off - write_off - 1;
    }

    // the maximum number of bytes we are currently able
    // to fit within the memory block (might include a wrap-around)
    [[nodiscard]] inline std::size_t max_avail_write_size(std::size_t write_off, std::size_t read_off) {
        std::size_t avail = read_off - write_off - 1;
        if (write_off >= read_off)
            avail += block->usable_buffer_size;
        return avail;
    }

    // the largest possible size an element could be and still
    // fit within the memory block.
    [[nodiscard]] inline std::size_t max_possible_write_size() {
        return block->usable_buffer_size - 1;
    }

    // pads a given size to be a multiple of the template parameter padding
    [[nodiscard]] inline std::size_t pad_size(std::size_t size) {
        if(size % padding != 0) {
            size += padding - size % padding;
        }
        return size;
    }

    // advances offset and wraps around if required
    [[nodiscard]] inline std::size_t advance_offset(std::size_t offset, std::size_t element_size) {
        std::size_t new_offset = offset + element_size;
       
        // wrap-around
        if(new_offset >= block->usable_buffer_size) {
            new_offset -= block->usable_buffer_size;
        }

        return new_offset;
    }

private:
    spsc_shm_block* block;
};
Turtlefight
  • 9,420
  • 2
  • 23
  • 40
  • 2
    Re: keeping the ring buffer smaller: Cache locality for the ring buffer is of somewhat limited value, assuming it's written on one core and read on another. Data will have to get written back to L3 (on Intel CPUs at least) to get between cores. If your threads are sharing logical cores of the same physical core, then data only needs to commit to L1d cache for the other thread to hit. (Or if they're context-switching on the the same actual core.) I guess multiple successive writes or reads can benefit from locality, though, for sequential access to multiple elements. But that works for chars – Peter Cordes Jan 03 '23 at 05:10
  • @PeterCordes That's a very good point! Thanks for your insights! (my knowledge about how caches work is unfortunately still pretty limited, i guess i have to read a few papers about that today :D ) I'll edit my post to include this. – Turtlefight Jan 03 '23 at 05:28
  • Interesting design choice to have the reader just get a pointer to the shared data, and only free up the ring buffer space when the reader is done. Your example `cout << *result << std::endl` doesn't pop until after at least one system call since it forces an ostream buffer flush with `endl` vs. `'\n'`. So it's a tradeoff between always forcing a copy even if sometimes the reader could just process in-place quickly enough, vs. freeing space faster to maybe allow a smaller ring buffer without blocking the writer unnecessarily. An always-copy API would make wrap-around invisible to the reader. – Peter Cordes Jan 03 '23 at 06:08
  • But an always-copy API would need the reader to provide a buffer, otherwise you're invoking dynamic allocation. So there are definite tradeoffs. I'd imagine your design is better for some use-cases (maybe especially with longer strings, or potentially-long strings?), while always-copy might be a win with strings that tend to be shorter, like up to 64 bytes. If the writer isn't nipping at your toes (almost-full buffer), accessing the shared data multiple times might not create extra cache misses so there might not be much advantage to getting all the queue bookkeeping done in one call. – Peter Cordes Jan 03 '23 at 06:11
  • @Turtlefight thanks for the brilliant answer and the code sample. I think you can even make this code into a GitHub repo! – madhur4127 Jan 03 '23 at 06:51
  • Also you can mention the point that `size` can be aligned at the 4-byte boundary (for uint32_t as size) for maximum efficiency on all platforms although I only care about x86 and on that it doesn't matter. – madhur4127 Jan 03 '23 at 06:59
  • 1
    @madhur4127 the alignment of the sizes within the buffer should not matter, because we always use `std::memcpy` to copy it into a sufficiently aligned local variable before accessing it. (so it should also work on architectures on which misaligned reads are not allowed) - on x86 you could probably get rid of the `std::memcpy` and read/write the size directly from/into the buffer, but i'm not sure if that would be significantly faster (misaligned read/write vs `std::memcpy`) - i guess you would need to profile that :) – Turtlefight Jan 03 '23 at 07:10
  • 1
    @PeterCordes The `std::cout` example is just intended as a basic example of the interface to demonstrate how it can be used. But i agree that any sort of blocking call should probably be avoided between `front()` and `pop()`. I tried to design this implementation so that copying out of the ring buffer would not be necessary (the consumer can still make a copy manually if required). So the `front()` + `pop()` combo needs 1 relaxed atomic read + 1 acquire atomic read more than a function that would make a copy and combine `front()` and `pop()` into a single call. – Turtlefight Jan 03 '23 at 07:50
  • 1
    Ok yeah, reasonable as long as you keep the `front()` and `pop()` close-ish, having callers copy manually if that won't be the case. If those end up too far apart, the extra reads may miss in cache again, if the writer touched them between accesses by the reader, but at least it's only extra reads. In a use-case where all callers will actually want to make a copy, you could let a string be split around the wrap-around and bake that into the API, otherwise yeah your design choice makes sense. – Peter Cordes Jan 03 '23 at 08:00
  • 1
    @Turtlefight: It's not safe or correct the dereference misaligned pointers in C++ even when compiling for a target where that's allowed in asm, like x86-64, as show in see [Why does unaligned access to mmap'ed memory sometimes segfault on AMD64?](https://stackoverflow.com/q/47510783) and the blogs linked from that. You need GNU C `typedef uint32_t unaligned_u32 __attribute__((aligned(1),may_alias)))` or ISO C/C++ `memcpy`. Small fixed-size memcpy will inline and optimize away to a single mov instruction on ISAs where misaligned access Just Works, like x86 and AArch64. – Peter Cordes Jan 03 '23 at 08:04
  • 2
    @madhur4127: It's not a bad idea to always align the size by 4, though; that prevents it from being split across two cache lines, which would cause extra latency in the reader. Rounding up to the next multiple of 4 is cheap and easy, although does itself cost an instruction or two that adds some latency to that dependency chain. It gets it into a fresh cache line so the writer won't steal it from the reader if the buffer is almost full. Probably worth doing when it only takes 0 to 3 bytes of padding; maybe in more cases but that's a tuning choice. – Peter Cordes Jan 03 '23 at 08:09
  • 2
    @PeterCordes Now that i think about it a function in the style of `boost::lockfree::spsc_queue::consume_one` might be a small improvement to my implementation ([implementation](https://github.com/boostorg/lockfree/blob/develop/include/boost/lockfree/spsc_queue.hpp#L181)) - it basically accepts a functor and combines `front()` + `pop()` into one function (with calling the functor inbetween) thereby getting rid of the 2 additional atomic reads of my current design. - regarding the misaligned reads: good to know that my `std::memcpy` might even be optimized into a simple `mov` :D – Turtlefight Jan 03 '23 at 08:15
1

Create your own string type that does what you want:

struct MyString
{
    uint8_t size; // how much of data is actually populated
    std::array<char, 127> data; // null terminated? up to you
};

Now an spsc_queue<MyString> can store strings without separate allocation.

John Zwinck
  • 239,568
  • 38
  • 324
  • 436