2

I want to make a lock free ring buffer in shared memory using runtime specified maximum number of entries. I am basing my code off an example I found in GitHub. I successfully created a lock free ring buffer in shared memory using this code.
In my case, I need to specify the maximum number of entries that the ring buffer can accept at runtime construction and not at compile time per the example. The call to construct the shm::ring_buffer in the example is shown below.

namespace bip = boost::interprocess;

namespace shm
{
    using char_alloc = bip::allocator<char, bip::managed_shared_memory::segment_manager>;
    using shared_string = bip::basic_string<char, std::char_traits<char>, char_alloc>;
    using ring_buffer = boost::lockfree::spsc_queue<shared_string, boost::lockfree::capacity<200>>;
}

The shared memory segment is allocated as follows:

mQueuingSharedMem = std::make_unique<bip::managed_shared_memory>(
    bip::open_or_create, (mSharedMemoryName + "Queuing").c_str(), rSHMSize);

Per the GitHub example, when I construct the ring buffer with the maximum size specified at compile time via the optional boost::lockfree::capacity<> template parameter, everything works (note: that the shared memory segment construct method takes the # ring_buffers in the [] and the constructor parameters are specified in the parenthesis that follow.

auto pSharedMemAddr = mQueuingSharedMem->construct<
   shm::ring_buffer>(rQueuingPortName.c_str())[1](/*aMaxNumMessages*/);

I thought that in order to construct the above shm::ring_buffer at runtime, I needed to remove the 2nd boost::lockfree::capacity<200> hard coded size parameter from the shm::spsc_queue and instead pass a maximum size for the shm::ring_buffer and the shared memory allocator for the shm::shm_string. I found a similar answer here but I was unable to adapt it to work with my code.

I made the following changes to the code that worked above in an attempt to specify the size of the ring buffer at runtime:

namespace bip = boost::interprocess;
namespace shm
{
    using char_alloc = bip::allocator<char, bip::managed_shared_memory::segment_manager>;
    using shared_string = bip::basic_string<char, std::char_traits<char>, char_alloc>;
    using ring_buffer = boost::lockfree::spsc_queue<shared_string/*, boost::lockfree::capacity<200>*/>;
}

    shm::char_alloc char_alloc(mQueuingSharedMem->get_segment_manager());
    auto pSharedMemAddr = mQueuingSharedMem->construct<
        shm::ring_buffer>(rQueuingPortName.c_str())[1](aMaxNumMessages);

I get a plethora of unintelligible compiler errors that I do not quite know how to fix:

1>------ Build started: Project: apex, Configuration: Debug x64 ------
1>APEXManager.cpp
1>C:\Users\johnc\main\extlibs\boost_1_65_1\boost/container/string.hpp(216): error C2512: 'boost::interprocess::allocator<char,boost::interprocess::segment_manager<CharType,MemoryAlgorithm,IndexType>>::allocator': no appropriate default constructor available
1>        with
1>        [
1>            CharType=char,
1>            MemoryAlgorithm=boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family,boost::interprocess::offset_ptr<void,ptrdiff_t,uintptr_t,0>,0>,
1>            IndexType=boost::interprocess::iset_index
1>        ]
1>C:\Users\johnc\main\extlibs\boost_1_65_1\boost/container/string.hpp(214): note: while compiling class template member function 'boost::container::container_detail::basic_string_base<Allocator>::members_holder::members_holder(void)'
1>        with
1>        [
1>            Allocator=shm::char_alloc
1>        ]
1>C:\Users\johnc\main\extlibs\boost_1_65_1\boost/container/string.hpp(100): note: see reference to function template instantiation 'boost::container::container_detail::basic_string_base<Allocator>::members_holder::members_holder(void)' being compiled
1>        with
1>        [
1>            Allocator=shm::char_alloc
1>        ]
1>C:\Users\johnc\main\extlibs\boost_1_65_1\boost/container/string.hpp(224): note: see reference to class template instantiation 'boost::container::container_detail::basic_string_base<Allocator>::members_holder' being compiled
1>        with
1>        [
1>            Allocator=shm::char_alloc
1>        ]
1>C:\Users\johnc\main\extlibs\boost_1_65_1\boost/container/string.hpp(506): note: see reference to class template instantiation 'boost::container::container_detail::basic_string_base<Allocator>' being compiled
1>        with
1>        [
1>            Allocator=shm::char_alloc
1>        ]
1>C:\Users\johnc\main\extlibs\boost_1_65_1\boost/lockfree/spsc_queue.hpp(557): note: see reference to class template instantiation 'boost::container::basic_string<char,std::char_traits<char>,shm::char_alloc>' being compiled
1>C:\Users\johnc\main\extlibs\boost_1_65_1\boost/lockfree/spsc_queue.hpp(555): note: while compiling class template member function 'boost::lockfree::detail::runtime_sized_ringbuffer<T,std::allocator<T>>::~runtime_sized_ringbuffer(void)'
1>        with
1>        [
1>            T=shm::shared_string
1>        ]
1>..\..\src\apex\APEXManager.cpp(660): note: see reference to function template instantiation 'boost::lockfree::detail::runtime_sized_ringbuffer<T,std::allocator<T>>::~runtime_sized_ringbuffer(void)' being compiled
1>        with
1>        [
1>            T=shm::shared_string
1>        ]
1>C:\Users\johnc\main\extlibs\boost_1_65_1\boost/lockfree/spsc_queue.hpp(693): note: see reference to class template instantiation 'boost::lockfree::detail::runtime_sized_ringbuffer<T,std::allocator<T>>' being compiled
1>        with
1>        [
1>            T=shm::shared_string
1>        ]
1>C:\Users\johnc\main\extlibs\boost_1_65_1\boost/interprocess/detail/in_place_interface.hpp(61): note: see reference to class template instantiation 'boost::lockfree::spsc_queue<shm::shared_string>' being compiled
1>C:\Users\johnc\main\extlibs\boost_1_65_1\boost/interprocess/detail/in_place_interface.hpp(58): note: while compiling class template member function 'void boost::interprocess::ipcdetail::placement_destroy<T>::destroy_n(void *,::size_t,size_t &)'
1>        with
1>        [
1>            T=shm::ring_buffer
1>        ]
1>C:\Users\johnc\main\extlibs\boost_1_65_1\boost/interprocess/detail/named_proxy.hpp(50): note: see reference to class template instantiation 'boost::interprocess::ipcdetail::placement_destroy<T>' being compiled
1>        with
1>        [
1>            T=shm::ring_buffer
1>        ]
1>C:\Users\johnc\main\extlibs\boost_1_65_1\boost/interprocess/detail/named_proxy.hpp(130): note: see reference to class template instantiation 'boost::interprocess::ipcdetail::CtorArgN<T,false,const MESSAGE_RANGE_TYPE &>' being compiled
1>        with
1>        [
1>            T=shm::ring_buffer
1>        ]
1>..\..\src\apex\APEXManager.cpp(365): note: see reference to function template instantiation 'T *boost::interprocess::ipcdetail::named_proxy<boost::interprocess::segment_manager<CharType,MemoryAlgorithm,IndexType>,T,false>::operator ()<const MESSAGE_RANGE_TYPE&>(const MESSAGE_RANGE_TYPE &) const' being compiled
1>        with
1>        [
1>            T=shm::ring_buffer,
1>            CharType=char,
1>            MemoryAlgorithm=boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family,boost::interprocess::offset_ptr<void,ptrdiff_t,uintptr_t,0>,0>,
1>            IndexType=boost::interprocess::iset_index
1>        ]
1>..\..\src\apex\APEXManager.cpp(365): note: see reference to function template instantiation 'T *boost::interprocess::ipcdetail::named_proxy<boost::interprocess::segment_manager<CharType,MemoryAlgorithm,IndexType>,T,false>::operator ()<const MESSAGE_RANGE_TYPE&>(const MESSAGE_RANGE_TYPE &) const' being compiled
1>        with
1>        [
1>            T=shm::ring_buffer,
1>            CharType=char,
1>            MemoryAlgorithm=boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family,boost::interprocess::offset_ptr<void,ptrdiff_t,uintptr_t,0>,0>,
1>            IndexType=boost::interprocess::iset_index
1>        ]
1>Done building project "apex.vcxproj" -- FAILED.
========== Build: 0 succeeded, 1 failed, 2 up-to-date, 0 skipped ==========
johnco3
  • 2,401
  • 4
  • 35
  • 67

1 Answers1

2

Dynamic size will only work if you specify the interprocess-memory allocator: http://www.boost.org/doc/libs/1_65_1/doc/html/boost/lockfree/allocator.html.

Sadly, though spsc_queue does support stateful allocators:

Defines the allocator. boost.lockfree supports stateful allocator and is compatible with Boost.Interprocess allocators

It does NOT seem to support the uses_allocator<> protocol required to pass the allocator down to it's element (shared_string), not even when using scoped_allocator_adaptor¹.

I ran into this before:

So my suggestion would be to remove one of the ingredients:

  • make the element a non-dynamic container (containers of containers require scoped allocator awareness or explicit element construction always(
  • make the queue fixed-size (this is usually a good idea for shared memory situations, IYAM)
  • add a layer of indirection...

On the latter, you could make the queue store a manged bip::shared_ptr<shared_string> instead:

Live On Coliru

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/managed_mapped_file.hpp> // for Coliru
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>

#include <boost/lockfree/spsc_queue.hpp>

#include <iostream>

/// noisy - traces special members
struct noisy {
    noisy& operator=(noisy&&) noexcept { std::cout << "operator=(noisy&&)\n"; return *this;      } 
    noisy& operator=(const noisy&)     { std::cout << "operator=(const noisy&)\n"; return *this; } 
    noisy(const noisy&)                { std::cout << "noisy(const noisy&)\n";                   } 
    noisy(noisy&&) noexcept            { std::cout << "noisy(noisy&&)\n";                        } 
    ~noisy()                           { std::cout << "~noisy()\n";                              } 
    noisy()                            { std::cout << "noisy()\n";                               } 
};

namespace bip = boost::interprocess;
namespace blf = boost::lockfree;

namespace Shared {
    using Segment = bip::managed_mapped_file; // Coliru unsupported: managed_shared_memory;
    using Manager = Segment::segment_manager;
    template <typename T> using Alloc = bip::allocator<T, Manager>;

    using String = bip::basic_string<char, std::char_traits<char>, Alloc<char> >;

    // using Element = String;
    // For debug/demonstration
    struct Element : String, noisy { using String::String; }; // inherit constructors

    using Ptr = bip::managed_shared_ptr<Element, Segment>::type;

    using Buffer = blf::spsc_queue<Ptr, blf::allocator<Alloc<Ptr> > >;
}

static std::string unique_id_gen() {
    static std::atomic_size_t s_gen { 0 };
    return "buffer_element" + std::to_string(++s_gen);
}

int main() {
    struct shm_remove {
        shm_remove() { bip::shared_memory_object::remove("MySharedMemory"); }
        ~shm_remove() { bip::shared_memory_object::remove("MySharedMemory"); }
    } remover;

    Shared::Segment segment(bip::create_only, "MySharedMemory", 4 << 20);

    auto& buffer = *segment.construct<Shared::Buffer>(bip::unique_instance)[1](20, segment.get_segment_manager());

    auto create = [&segment](auto&&... args) {
        return make_managed_shared_ptr(segment.construct<Shared::Element>(unique_id_gen().c_str())
                (
                    std::forward<decltype(args)>(args)...,
                    segment.get_segment_manager()
                ), segment);
    };

    std::cout << "Pushing\n";

    for (auto msg : { "hello", "world", "bye", "cruel", "world" })
        buffer.push(create(msg));

    std::cout << "Popping\n";
    {
        Shared::Ptr into;
        while (buffer.pop(into)) {
            std::cout << "Popped: '" << *into << "'\n";
        }
        std::cout << "Going out of scope\n";
    } // RAII
    std::cout << "Out of scope\n";

    {
        // make sure any other owned queue elements are freed if the queue is destroyed before it's empty:
        for (auto msg : { "HELLO", "WORLD", "BYE", "CRUEL", "WORLD" })
            buffer.push(create(msg));

        std::cout << "Destroying buffer containing 5 elements\n";
        segment.destroy<Shared::Buffer>(bip::unique_instance);
    }
}

Prints:

Pushing
noisy()
noisy()
noisy()
noisy()
noisy()
Popping
Popped: 'hello'
~noisy()
Popped: 'world'
~noisy()
Popped: 'bye'
~noisy()
Popped: 'cruel'
~noisy()
Popped: 'world'
Going out of scope
~noisy()
Out of scope
noisy()
noisy()
noisy()
noisy()
noisy()
Destroying buffer containing 5 elements
~noisy()
~noisy()
~noisy()
~noisy()
~noisy()

¹ search my answers on how to use these with other containers of containers in shared memory

sehe
  • 374,641
  • 47
  • 450
  • 633
  • Why are the managed shared mem pointers necessary? Shouldn't it be possible to make a Buffer as in [other of your answers](https://stackoverflow.com/a/62515428/1433554) with spsc_queue with capacity defined during runtime? – alex Nov 18 '20 at 11:03
  • 1
    @alex Sure, the Gist linked in the question was from [this answer](https://stackoverflow.com/questions/22207546/shared-memory-ipc-synchronization-lock-free/22209595#22209595), which shows that, but for the compile-time-capacity version. The documentation requires ["T must have a default constructor" and "T must be copyable"](https://www.boost.org/doc/libs/1_74_0/doc/html/boost/lockfree/spsc_queue.html), The string type doesn't satisfy that criterion, but the [shared pointer does](https://www.boost.org/doc/libs/1_74_0/doc/html/boost/interprocess/shared_ptr.html#idm46159920765440-bb) – sehe Nov 18 '20 at 14:41
  • 1
    (Note that even if you could teach spsc_queue to default-initialize using the segment manager, it would turn out to be pretty inefficient due to all the extraneous copying of `Elements` constructed in the segment). – sehe Nov 18 '20 at 15:14
  • 1
    Thanks for the answers. I am actually struggling to implement serialized cv::Mat exchange between processes and string seemed as a way forward. But it's out of the scope o this questions so posted a new [question](https://stackoverflow.com/questions/64897387/how-to-construct-boost-spsc-queue-with-runtime-size-parameter-to-exchange-cvma). – alex Nov 18 '20 at 16:45