1

I have this writer, I run this like so: ./writer 12 14

it creates two shared memory segments with a spsc queue in each one.

The writer just send text with a counter to the spsc queue of his memory segment.

#include <boost/container/scoped_allocator.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/process.hpp>
#include <iostream>

using namespace std::chrono_literals;
namespace bip = boost::interprocess;
namespace chro = std::chrono;
namespace blf = boost::lockfree;

using char_alloc = bip::allocator<char, bip::managed_shared_memory::segment_manager>;
using shared_string = bip::basic_string<char, std::char_traits<char>, char_alloc>;
using ring_buffer = blf::spsc_queue<shared_string, blf::capacity<200>>;

int main(int argc, char* argv[]) {

    if (argc > 1) {

        std::string n1 = argv[1];
        std::string n2 = argv[2];

        const std::string shm_name1 = "segmentrb" + n1;
        const std::string shm_name2 = "segmentrb" + n2;
        const std::string qname = "queue";

        boost::interprocess::shared_memory_object::remove(shm_name1.c_str());
        boost::interprocess::shared_memory_object::remove(shm_name2.c_str());

        bip::managed_shared_memory seg1(bip::open_or_create, shm_name1.c_str(), 10'000);
        char_alloc char_alloc1(seg1.get_segment_manager());
        ring_buffer* qu1 = seg1.find_or_construct<ring_buffer>(qname.c_str())();

        bip::managed_shared_memory seg2(bip::open_or_create, shm_name2.c_str(), 10'000);
        char_alloc char_alloc2(seg2.get_segment_manager());
        ring_buffer* qu2 = seg2.find_or_construct<ring_buffer>(qname.c_str())();
        
        int counter = 0;

        while (true) {
                
            std::string text1 = "Text from 1, count ";
            text1.append(std::to_string(counter));

            qu1->push(shared_string(text1.c_str(), char_alloc1));

            std::string text2 = "Text from 2, count ";
            text2.append(std::to_string(counter));

            qu2->push(shared_string(text2.c_str(), char_alloc2));

            std::this_thread::sleep_for(std::chrono::milliseconds(1));
            counter++;
        }
    }
}

Then I have this reader, that reads an pop the spsc of the two segments:

I run this with: ./reader 12 14

#include <boost/container/scoped_allocator.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/process.hpp>
#include <boost/unordered_map.hpp>
#include <iostream>

using namespace std::chrono_literals;
namespace bip = boost::interprocess;
namespace chro = std::chrono;
namespace blf = boost::lockfree;

using char_alloc = bip::allocator<char, bip::managed_shared_memory::segment_manager>;
using shared_string = bip::basic_string<char, std::char_traits<char>, char_alloc>;
using ring_buffer = blf::spsc_queue<shared_string, blf::capacity<200>>;

int main(int argc, char* argv[]) {
    if (argc > 1) {

        std::string n1 = argv[1];
        std::string n2 = argv[2];

        const std::string shm_name1 = "segmentrb" + n1;
        const std::string shm_name2 = "segmentrb" + n2;
        const std::string qname = "queue";

        bip::managed_shared_memory seg1(bip::open_only, shm_name1.c_str());
        char_alloc char_alloc1(seg1.get_segment_manager());
        ring_buffer* qu1 = seg1.find<ring_buffer>(qname.c_str()).first;

        bip::managed_shared_memory seg2(bip::open_only, shm_name2.c_str());
        char_alloc char_alloc2(seg2.get_segment_manager());
        ring_buffer* qu2 = seg2.find<ring_buffer>(qname.c_str()).first;

        while (true) {

            shared_string v1(char_alloc1);
            shared_string v2(char_alloc2);

            qu1->pop(v1);
            qu2->pop(v2);

            long lv1 = v1.length();
            long lv2 = v2.length();
            long lvs = lv1 + lv2;

            if (lvs > 0) {
                if (lv1 > 0) {
                    std::cout << "Rec1: " << v1 << "\n";
                }

                if (lv2 > 0) {
                    std::cout << "Rec2: " << v2 << "\n";
                }
            }
            else {
                std::this_thread::sleep_for(std::chrono::milliseconds(20));
            }
        }
    }
}

when I do kill -9 on the reader I get this on the writer:

terminate called after throwing an instance of 'boost::interprocess::bad_alloc'
  what():  boost::interprocess::bad_alloc
Aborted (core dumped)

How can I avoid the writer being killed?

Paul
  • 181
  • 4
  • 11

1 Answers1

1

Reviewed your producer code, removing duplicate code:

Live On Coliru

#include <boost/container/scoped_allocator.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/process.hpp>
#include <iostream>

using namespace std::chrono_literals;
namespace bip = boost::interprocess;
namespace blf = boost::lockfree;
using namespace std::literals;

namespace Shared {
    using Segment                     = bip::managed_shared_memory;
    using Mgr                         = Segment::segment_manager;
    template <typename T> using Alloc = bip::allocator<T, Mgr>;

    using shared_string = bip::basic_string<char, std::char_traits<char>, Alloc<char>>;
    using ring_buffer   = blf::spsc_queue<shared_string, blf::capacity<200>>;

    struct Queue {
        Queue(std::string const& name, std::string const& qname = "queue")
            : _name(name)
            , _buffer(_segment.find_or_construct<ring_buffer>(qname.c_str())()) {}

        std::string const& name() const { return _name; }

        bool push(std::string const& item) {
            return _buffer->push(
                shared_string(item.begin(), item.end(), _segment.get_segment_manager()));
        }

      private:
        std::string const _name;
        struct pre_remover_t {
            pre_remover_t(std::string const& name) {
                bip::shared_memory_object::remove(name.c_str());
            }
        } _pre_remover{_name};
        bip::managed_shared_memory _segment{bip::open_or_create, _name.c_str(), 10'000};
        ring_buffer*               _buffer = nullptr;
    };
} // namespace Shared

int main(int argc, char* argv[]) {
    std::deque<Shared::Queue> queues;
    for (auto& arg : std::vector<std::string>(argv + 1, argv + argc))
        queues.emplace_back("segmentrb" + arg);

    for (int counter = 0; true; ++counter) {
        for (auto& q : queues)
            q.push("Text from " + q.name() + ", count " + std::to_string(counter));

        std::this_thread::sleep_for(1ms);
    }
}

Reading that makes the problem readily evident: You are pushing elements every 1ms.

Killing the producer gives you 200*1ms = 0.2s before the queue is full. Obviously, then push will fail with the exception indicating that the queue is out of capacity.

UPDATE

From the comments, a version that combines producer and consumer:

Live On Coliru

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <deque>
#include <iomanip>
#include <iostream>
#include <ranges>
#include <thread>

namespace bip = boost::interprocess;
namespace blf = boost::lockfree;
using namespace std::literals;
using std::this_thread::sleep_for;
using std::ranges::views::transform;

namespace Shared {
    static constexpr std::string_view queue_name = "queue";

    using Segment                     = bip::managed_shared_memory;
    struct pre_remover_t {
        pre_remover_t(std::string const& name) { bip::shared_memory_object::remove(name.c_str()); }
    };
    using Mgr                         = Segment::segment_manager;
    template <typename T> using Alloc = bip::allocator<T, Mgr>;

    using String = bip::basic_string<char, std::char_traits<char>, Alloc<char>>;
    using ring_buffer   = blf::spsc_queue<String, blf::capacity<200>>;

    struct Producer {
        Producer(std::string const& name)
            : _name(name)
            , _buffer(_segment.find_or_construct<ring_buffer>(queue_name.data())()) {}

        std::string const& name() const { return _name; }

        bool push(std::string const& item) {
            std::cerr << "push: " << quoted(item) << std::endl;
            return _buffer->push(
                String(item.begin(), item.end(), _segment.get_segment_manager()));
        }

      private:
        std::string const _name;
        pre_remover_t     _pre_remover{_name};
        Segment           _segment{bip::open_or_create, _name.c_str(), 10'000};
        ring_buffer*      _buffer = nullptr;
    };

    struct Consumer {
        Consumer(std::string const& name)
            : _name(name)
            , _buffer(_segment.find_or_construct<ring_buffer>(queue_name.data())()) {}

        String pop() {
            String r(_segment.get_segment_manager());
            _buffer->pop(r);
            return r;
        }

      private:
        std::string const _name;
        Segment           _segment{bip::open_only, _name.c_str()};
        ring_buffer*      _buffer = nullptr;
    };
} // namespace Shared

int main(int argc, char* argv[]) {
    std::deque<std::string> args(argv + 1, argv + argc);
    bool const is_producer = args.front() == "producer";
    args.pop_front();

    if (is_producer) {
        std::deque<Shared::Producer> queues;
        for (auto& arg : args)
            queues.emplace_back("segmentrb" + arg);

        for (int counter = 0; true; ++counter) {
            for (auto& q : queues)
                q.push("Text from " + q.name() + ", count " + std::to_string(counter));

            sleep_for(1s);
        }
    } else { // consumer
        std::deque<Shared::Consumer> queues;
        for (auto& arg : args)
            queues.emplace_back("segmentrb" + arg);

        for (;;) {
            auto no_data = true;

            for (int index = 0; auto&& v : queues | transform(&Shared::Consumer::pop)) {
                if (!v.empty()) {
                    no_data = false;
                    std::cout << "Rec" << ++index << ": " << v << "\n";
                }
            }

            if (no_data) {
                std::cerr << "Consumer no-data cycle" << std::endl;
                sleep_for(2s);
            }
        }
    }
}

With a local demonstration as well:

enter image description here

sehe
  • 374,641
  • 47
  • 450
  • 633
  • 1
    (Didn't have time due to family things, but you should of course detect that the queue is full to avoid the exception, or handle it with excpetion handling) – sehe Feb 11 '23 at 17:37
  • 1
    For completeness, also refactored the consumer side: http://coliru.stacked-crooked.com/a/3f3a98cc47ac6364, local interactive demo: https://imgur.com/a/IWDoSlx. I hope this answers your other question about how to [move some of my procedural code answers into classes](https://stackoverflow.com/questions/72310393/does-boost-interprocess-support-sharing-objects-containing-pointers-between-proc/72312694?noredirect=1#comment133029499_72312694) – sehe Feb 11 '23 at 23:16
  • 1
    I am amazed by the quality of your answers, thank you very much. SO should add an Awesome response flag – Paul Feb 12 '23 at 02:25