1

Let's say I have a process A and a process B, and process A would like to pass a C string to process B through a shm_open() + mmap() shared memory.

What's the most latency efficient way?

The answer of this post suggested that after C++11, std::atomic is the right way to share data over shared memory.

However, I fail to see how I can write something to write a C string with something like this:

struct Buffer {
std::atomic<uint32_t> length;
std::atomic<char*> str;
} __attribute__((packed));

Given I have a shared memory created this way:

class SHM {
    char* _ptr;
public:
    SHM() {
        const auto handle = shm_open("myTest", O_RDWR|O_CREAT, 0666);
        const auto size =  4 * 1024 * 1024;
        if (-1 == ftruncate(handle, size)) {
            throw;
        }
        _ptr = (char*)mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);

        if(_ptr == MAP_FAILED){
            throw;
        }

        int rc = fchmod(handle, 0666);
        if (rc == -1) {
            throw;
        }
    }

    // assume to caller will do buffer.size.store(someLength, std::memory_order_release); after filling up Buffer::str
    Buffer& getBuffer() noexcept {
        return *reinrepret_cast<Buffer*>(_ptr);
    }

    Buffer& read() {
        auto& buffer = *reinrepret_cast<Buffer*>(_ptr);
        while (buffer.size.load(std::memory_order_acquire) > 0) {
            buffer.str.load(std::memory_order_relaxed);
            return buffer;
        }
    }
};

How can the caller to SHM::getBuffer() properly write to Buffer::str char by char so that process B can call SHM::read() to retrieve?

Does buffer.str.load(std::memory_order_relaxed) actually load atomically and correctly? I doubt that as it doesn't even know the length.

This is for Linux, X86-64, GCC 7.

Thanks in advance.

HCSF
  • 2,387
  • 1
  • 14
  • 40
  • Is it single-producer-single-consumer problem? – Maxim Egorushkin Jul 05 '18 at 16:11
  • Most efficient way to send text from one process to other process in UNIX (that Linux is) is to use pipe because that is what everything in Unixes have always used for that and so it is optimized to perfection. Use shared memory for communicating between multiple processes in multi-processor system. – Öö Tiib Jul 05 '18 at 16:11
  • @ÖöTiib It is an easy way, but not most efficient in terms of latency. Especially now that after Spectre patches syscalls are more expensive. Latency sensitive applications avoid doing syscalls. – Maxim Egorushkin Jul 05 '18 at 16:13
  • I think atomic is for threads, not processes. You do not discuss whether you process has threads. – William J Bagshaw Jul 05 '18 at 16:21
  • @WilliamJBagshaw You are mistaken. A process cannot be without at least 1 thread. – Maxim Egorushkin Jul 05 '18 at 16:22
  • @MaximEgorushkin Threads with an "s" multiple-treads if you like, anyway, I atomic adds nothing to two single threaded processes trying to communicate. Which is what this question is about. There needs to be an inter-process semaphore that the shared memory message has been sent. pipes would be one way forward. Either send the message in the pipe or use it to signal that the memory mapped message has been sent. In the pipe would be faster. – William J Bagshaw Jul 05 '18 at 16:43
  • @WilliamJBagshaw You are right that if the consumer needs to block till data is available it needs to use a semaphore or a process-shared mutex + condition-variable. For lowest latency possible though, the consumer has to busy-wait because it takes at least 1us to for the thread to wake up and run. – Maxim Egorushkin Jul 05 '18 at 22:11
  • @MaximEgorushkin there could be multiple processes writing to the same shared memory segment and multiple processes reading it. Ideally, the segment acts as a ringbuffer; I didn't specify ringbuffer because I don't want to complicate the discussion. Thanks for helping out in various posts! – HCSF Jul 06 '18 at 02:04
  • @HCSF The general case lock-free of multiple-producers-multiple-consumers has been a hot topic in the research and somewhat of a holy grail. Implementations based on `fetch_and_add` are promising, they only operate on ring-buffers of pointers (or anything with a sentinel value). – Maxim Egorushkin Jul 06 '18 at 03:57
  • Have a look at [Boost.Interprocess](https://theboostcpplibraries.com/boost.interprocess). – Maxim Egorushkin Jul 06 '18 at 04:13
  • @MaximEgorushkin yes, most implementations I saw online are based on some sentinel value. fetch_and_add() is used in most of them. Tho, I noticed that it is translated into lockadd instruction. I saw boost::interprocess long time ago but I didn't dig in as someone benchmarked it and the latency was quite high. I will take a look again. Maybe there is something I can reuse. Thanks! – HCSF Jul 06 '18 at 09:08

1 Answers1

1

Here is a working sketch for single-producer-single-consumer case (it doesn't matter if the producer/consumer threads from the same process or not), wait-free:

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <unistd.h>
#include <fcntl.h>
#include <utility>
#include <cstring>
#include <string>
#include <atomic>

class SingleProducerSingleConsumerIndexes {
    std::atomic<uint64_t> produced_ = {};
    std::atomic<uint64_t> consumed_ = {};

public: // Producer interface.
    uint64_t produced() {
        auto consumed = consumed_.load(std::memory_order_acquire); // Syncronizes with store 2.
        auto produced = produced_.load(std::memory_order_relaxed);
        if(produced != consumed || !produced)
            return produced;
        // Entire buffer was consumed. Rewind.
        produced_.store(0, std::memory_order_release); // Store 1.
        consumed_.store(0, std::memory_order_relaxed); // Store 3.
        return 0;
    }

    void produce(uint64_t end) {
        produced_.store(end, std::memory_order_release); // Store 1.
    }

public: // Consumer interface.
    std::pair<uint64_t, uint64_t> available() const {
        auto produced = produced_.load(std::memory_order_acquire); // Syncronizes with store 1.
        auto consumed = consumed_.load(std::memory_order_relaxed);
        // min handles the case of store 3 not visible yet.
        return {std::min(produced, consumed), produced};
    }

    void consume(uint64_t end) {
        consumed_.store(end, std::memory_order_release); // Store 2.
    }
};

class SharedMemoryStrings {
    void* p_;
    static constexpr int size = 4 * 1024 * 1024;
    static constexpr int buffer_size = size - sizeof(SingleProducerSingleConsumerIndexes);
public:
    SharedMemoryStrings() {
        auto handle = ::shm_open("/another-test", O_RDWR|O_CREAT, 0666);
        if(-1 == ::ftruncate(handle, size))
            throw;
        p_ = ::mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);
        ::close(handle);
        if(p_ == MAP_FAILED)
            throw;
    }

    ~SharedMemoryStrings() {
        ::munmap(p_, size);
    }

    void produce(std::string const& s) {
        auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
        auto produced = indexes->produced();
        uint64_t new_end = produced + sizeof(uint64_t) + s.size();
        if(new_end > buffer_size)
            throw; // Out of buffer space.

        auto* buffer = reinterpret_cast<char*>(indexes + 1) + produced;
        uint64_t size = s.size();
        memcpy(buffer, &size, sizeof size);
        buffer += sizeof size;
        memcpy(buffer, s.data(), s.size());

        indexes->produce(new_end);
    }

    bool try_consume(std::string& s) {
        auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
        auto available = indexes->available();
        auto consumed = available.first;
        auto produced = available.second;
        if(consumed == produced)
            return false; // No data available.

        auto* buffer = reinterpret_cast<char const*>(indexes + 1) + consumed;
        uint64_t size;
        memcpy(&size, buffer, sizeof size);
        buffer += sizeof size;
        // Reuse the string to minimize memory allocations.
        s.assign(buffer, size);

        indexes->consume(consumed + sizeof(uint64_t) + size);
        return true;
    }
};

int main(int ac, char** av) {
    if(ac > 1) {
        // Producer.
        SharedMemoryStrings a;
        for(int i = 1; i < ac; ++i)
            a.produce(av[i]);
    }
    else {
        // Consumer.
        SharedMemoryStrings a;
        for(std::string s;;) { // Busy-wait loop.
            if(a.try_consume(s)) // Reuse the string to minimize memory allocations.
                printf("%s\n", s.c_str());
            // else // Potential optimization.
            //     _mm_pause();
        }
    }
}

Notes:

  • Compile the code like g++ -o test -W{all,extra,error} -std=gnu++11 -O3 -DNDEBUG -march=native -pthread -lrt test.cc. Assuming this source is called test.cc.

  • Start the consumer with no arguments, ./test. The producer with arguments, like ./test hello world. The start order does not matter.

  • It is a single-producer-single-consumer solution. It is wait-free (producer and consumer calls complete in a fixed number of instructions, no loop), which is better than just lock-free (which doesn't guarantee completion in a fixed number of instructions). Cannot go faster that that.

  • On x86-64 these acquire and release atomic loads and stores compile into plain mov instructions because current x86-64 memory model is a bit too strong. However, using std::atomic and specific memory orders ensures that the compiler does not reorder instructions. And it also makes sure that the code compiles and works correctly on architectures with weaker memory models and inserts appropriate barriers if necessary, which volatile cannot possibly do. Like PowerPC, for example. Using volatile is the same as using std::memory_order_relaxed. See the assembly comparison.

  • produced_.store(end, std::memory_order_release); ensures that all previous stores (memcpy into the shared memory) made by producer thread become visible to consumer thread as soon as the effect of this store is visible by produced_.load(std::memory_order_acquire);. See http://preshing.com/20130823/the-synchronizes-with-relation/ for thorough treatment of the subject. Also std::memory_order says it best:

    memory_order_acquire A load operation with this memory order performs the acquire operation on the affected memory location: no reads or writes in the current thread can be reordered before this load. All writes in other threads that release the same atomic variable are visible in the current thread.

    memory_order_release A store operation with this memory order performs the release operation: no reads or writes in the current thread can be reordered after this store. All writes in the current thread are visible in other threads that acquire the same atomic variable and writes that carry a dependency into the atomic variable become visible in other threads that consume the same atomic.

  • The producer detects when the consumer has consumed all available data. In this case the producer rewinds the buffer to the start. This is done to avoid handling buffer wrapping for ring-buffer. If the consumer cannot process messages fast enough the buffer will get full eventually regardless.

  • It never calls SingleProducerSingleConsumerIndexes constructor. It relies on the fact that a new file is zero-initialized and that is what the constructor would do. In more complex scenarios it needs to invoke the constructor of shared data if the file has just been created. That can be done by creating a temporary file with a unique name first (if the file does not exist yet), mapping the file into memory and invoking the constructor. Then renaming that temporary file to the final name (rename is atomic). If renaming fails because the file already exists, delete the temporary file and start again.

  • The consumer does busy-waiting for lowest possible latency. If you would like the consumer to block while waiting it is possible to add a process shared mutex and condition variable to make that happen. It takes a few microseconds to wake up a thread waiting on a condition variable (futex in Linux) in the kernel, though. That would require calling SingleProducerSingleConsumerIndexes constructor to do all required initialization (e.g. initialize a robust adaptive process-shared mutex and a process-shared condition variable).

Maxim Egorushkin
  • 131,725
  • 17
  • 180
  • 271
  • your post is very detailed and code is very clean. Thanks! I think the main puzzle to me is that `p_` isn't `volatile`, and when `memcpy()` is called in `try_consume()`, why is it guaranteed to read the correct string out of the shared memory? In another [post](https://stackoverflow.com/questions/51168908/fail-to-read-through-shared-memory), without `volatile`, the compiler just optimizes away and so it doesn't even go to shared memory to read but probably reads from a register or cache. – HCSF Jul 06 '18 at 03:19
  • Is it because `SingleProducerSingleConsumerIndexes::produce()` and `SingleProducerSingleConsumerIndexes::available()` form an "atomic transaction" that when `produced_.load(std::memory_order_acquire)` is returned, the cache location for holding the string the producer has `memcpy()`-ed is invalidated, and so when the consumer tries to `memcpy()` the string out, the cache is invalid and so it will go to the shared memory? – HCSF Jul 06 '18 at 03:19
  • @HCSF Yes, acquire/release are the key. Added a bullet for you. – Maxim Egorushkin Jul 06 '18 at 03:43
  • so basically the trick is -- the writer tries to write all stuffs without any atomic/lock/volatile stuffs, and then uses std::memory_order_release on one atomic variable with a memory location, and then the reader uses std::memory_order_acquire on one atomic variable with the same memory location, then all the stuffs written by the writer (regardless whether the stuffs are written with atomic/lock/volatile) will become visible to the reader at that point? – HCSF Jul 06 '18 at 03:48
  • @HCSF For single-threaded processes `-pthread` is probably not needed. – Maxim Egorushkin Jul 06 '18 at 14:23
  • I am just concern that in order for g++ to interpret `std::atomic` and `std::memory_order_*` correctly, -pthread might be needed although they are single thread applications and assembly code shows just `mov` (I am not very familiar with assembly code, but the addresses do look differently with `std::memory_order_*`). – HCSF Jul 07 '18 at 01:52
  • Hi, sorry for coming back to this again. It seems like many people suggested casting from non atomic to atomic have undefined behavior. [link1](http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n4013.html) [link2](https://stackoverflow.com/questions/14858770/c11-stdatomic-fetch-add-vs-sync-fetch-and-add) [link3](https://stackoverflow.com/questions/8749038/how-to-use-stdatomic-efficiently). so your `static_cast` in `produce()` and `try_consume()` could be an issue? – HCSF Jul 07 '18 at 13:37
  • @HCSF UB does not happen here. We got memory and treat that memory as object. – Maxim Egorushkin Jul 09 '18 at 09:18
  • Yes, we should have sufficient shared memory to hold the atomic variables. But won't it have the issue that the compiler gets confused -- "However this is not guaranteed to be reliable, even on platforms on which one might expect it to work, since it may confuse type-based alias analysis in the compiler. A compiler may assume that an int is not also accessed as an atomic. (See 3.10, [Basic.lval], last paragraph.)" quoted from my [link1](http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n4013.html). Thanks! – HCSF Jul 09 '18 at 09:25
  • @HCSF That does not apply here. If it did you would not be able to use atomics with heap allocated memory. – Maxim Egorushkin Jul 09 '18 at 09:26