1

I'm trying to consume from a child process a synchronized queue in c++. I'm using this synchronized queue in C++ () (http://www.internetmosquito.com/2011/04/making-thread-safe-queue-in-c-i.html)

I modified the queue to be serializable in boost and also replaced the used boost::mutex io_mutex_ to use instead an inteprocess mutex (thanks @Sehe) boost::interprocess::interprocess_mutex io_mutex_ And when locking I changed every line that has boost::mutex::scoped_lock lock(io_mutex_); to scoped_lock<interprocess_mutex> lock(io_mutex_);

template<class T>
class SynchronizedQueue
{
    friend class boost::serialization::access;
    template<class Archive>
    void serialize(Archive & ar, const unsigned int version)
    {
        ar & sQueue;
        ar & io_mutex_;
        ar & waitCondition;
    }
    ... // queue implementation (see [http://www.internetmosquito.com/2011/04/making-thread-safe-queue-in-c-i.html][2])

}

In my Test app, I'm creating the synchronized queue and storing in it 100 instances of this class:

class gps_position
{
    friend class boost::serialization::access;
    template<class Archive>
    void serialize(Archive & ar, const unsigned int version)
    {
        ar & degrees;
        ar & minutes;
        ar & seconds;
    }
public:
 int degrees;
 int minutes;
 float seconds;

 gps_position() {};
 gps_position(int d, int m, float s) :
 degrees(d), minutes(m), seconds(s)
 {}
};

Common definitions between Consumer and producer:

 char *SHARED_MEMORY_NAME = "MySharedMemory";
 char *SHARED_QUEUE_NAME  =  "MyQueue";
 typedef SynchronizedQueue<gps_position> MySynchronisedQueue;

Producer process code:

    // Remove shared memory if it was created before
    shared_memory_object::remove(SHARED_MEMORY_NAME);
    // Create a new segment with given name and size
    managed_shared_memory mysegment(create_only,SHARED_MEMORY_NAME, 65536);
    MySynchronisedQueue *myQueue = mysegment.construct<MySynchronisedQueue>(SHARED_QUEUE_NAME)();
    //Insert data in the queue
    for(int i = 0; i < 100; ++i)  {
        gps_position position(i, 2, 3);
        myQueue->push(position);
    }
    // Start 1 process (for testing for now)
    STARTUPINFO info1={sizeof(info1)};
    PROCESS_INFORMATION processInfo1;
    ZeroMemory(&info1, sizeof(info1));
    info1.cb = sizeof info1 ; //Only compulsory field
    ZeroMemory(&processInfo1, sizeof(processInfo1));
    // Launch child process
    LPTSTR szCmdline = _tcsdup(TEXT("ClientTest.exe"));
    CreateProcess(NULL, szCmdline, NULL, NULL, TRUE, 0, NULL, NULL, &info1, &processInfo1);
    // Wait a little bit ( 5 seconds) for the started client process to load
    WaitForSingleObject(processInfo1.hProcess, 5000);

    /* THIS TESTING CODE WORK HERE AT PARENT PROCESS BUT NOT IN CLIENT PROCESS
    // Open the managed segment memory
    managed_shared_memory openedSegment(open_only, SHARED_MEMORY_NAME);
    //Find the synchronized queue using it's name
    MySynchronisedQueue *openedQueue = openedSegment.find<MySynchronisedQueue>(SHARED_QUEUE_NAME).first;
    gps_position position;
    while (true) {
        if (myQueue->pop(position)) {
            std::cout << "Degrees= " << position.degrees << " Minutes= " << position.minutes << " Seconds= " << position.seconds;
            std::cout << "\n";
        }
        else
            break;
    }*/


    // Wait until the queue is empty: has been processed by client(s)
    while(myQueue->sizeOfQueue() > 0) continue;

    // Close process and thread handles. 
    CloseHandle( processInfo1.hThread );

My consumer code is as follow:

    //Open the managed segment memory
    managed_shared_memory segment(open_only, SHARED_MEMORY_NAME);
    //Find the vector using it's name
    MySynchronisedQueue *myQueue = segment.find<MySynchronisedQueue>(SHARED_QUEUE_NAME).first;
    gps_position position;
    // Pop each position until the queue become empty and output its values
    while (true)
    {
        if (myQueue->pop(position)) { // CRASH HERE
            std::cout << "Degrees= " << position.degrees << " Minutes= " << position.minutes << " Seconds= " << position.seconds;
            std::cout << "\n";
        }
        else
            break;
    }

When I run the parent process (producer) that create the queue and create the child (consumer) process, the child crash when trying to 'pop' from the queue.

What I'm doing wrong here ? Any idea ? Thanks for any insight. This is my first app creating using boost and shared memory.

My goal is to be able to consume this queue from multiple process. In the example above I'm creating only one child process to make sure first it works before creating other child process. The idea is the queue will be filled in advance by items and multiple created process will 'pop' items from it without clashing on each other.

  • Wait. What. You want to _serialize_ a mutex and a condition variable? To put it bluntly: _You officially have no clue what you're doing._ I don't think I need to bother showing how to serialize the `std::queue` container adaptor. – sehe Oct 22 '14 at 18:22

1 Answers1

5

To the updated code:

  • you should be using interprocess_mutex if you're gonna share the queue; This implies a host of dependent changes.
  • your queue should be using a shared-memory allocator if you're gonna share the queue
  • the conditions should be raised under the mutex for reliable behaviour on all platforms
  • you failed to lock inside toString(). Even though you copy the collection, that's not nearly enough because the container may get modified during that copy.
  • The queue design makes much sense (what is the use of a "thread safe" function that returns empty()? It could be no longer empty/just empty before you process the return value... These are called race conditions and lead to really hard to track bugs
  • What has Boost Serialization got to do with anything? It seems just there to muddle the picture, because it's not required and not being used.
  • Likewise for Boost Any. Why is any used in toString()? Due to the design of the queue, the typeid is always gpsposition anyways.
  • Likewise for boost::lexical_cast<> (why are you doing string concatenation if you already have the stringstream anyways?)
  • Why are empty(), toString(), sizeOfQueue() not const?

I highly recommend to use boost::interprocess::message_queue. This seems to be what you actually wanted to use.

Here's a modified version that puts the container in shared memory and it works:

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/thread/lock_guard.hpp>
#include <sstream>

namespace bip = boost::interprocess;

template <class T> class SynchronizedQueue {

  public:
    typedef bip::allocator<T, bip::managed_shared_memory::segment_manager> allocator_type;
  private:
    bip::deque<T, allocator_type> sQueue;
    mutable bip::interprocess_mutex io_mutex_;
    mutable bip::interprocess_condition waitCondition;
  public:
    SynchronizedQueue(allocator_type alloc) : sQueue(alloc) {} 

    void push(T element) {
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
        sQueue.push_back(element);
        waitCondition.notify_one();
    }
    bool empty() const {
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
        return sQueue.empty();
    }
    bool pop(T &element) {
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);

        if (sQueue.empty()) {
            return false;
        }

        element = sQueue.front();
        sQueue.pop_front();

        return true;
    }
    unsigned int sizeOfQueue() const {
        // try to lock the mutex
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
        return sQueue.size();
    }
    void waitAndPop(T &element) {
        boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);

        while (sQueue.empty()) {
            waitCondition.wait(lock);
        }

        element = sQueue.front();
        sQueue.pop();
    }

    std::string toString() const {
        bip::deque<T> copy;
        // make a copy of the class queue, to reduce time locked
        {
            boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
            copy.insert(copy.end(), sQueue.begin(), sQueue.end());
        }

        if (copy.empty()) {
            return "Queue is empty";
        } else {
            std::stringstream os;
            int counter = 0;

            os << "Elements in the Synchronized queue are as follows:" << std::endl;
            os << "**************************************************" << std::endl;

            while (!copy.empty()) {
                T object = copy.front();
                copy.pop_front();
                os << "Element at position " << counter << " is: [" << typeid(object).name()  << "]\n";
            }
            return os.str();
        }
    }
};

struct gps_position {
    int degrees;
    int minutes;
    float seconds;

    gps_position(int d=0, int m=0, float s=0) : degrees(d), minutes(m), seconds(s) {}
};

static char const *SHARED_MEMORY_NAME = "MySharedMemory";
static char const *SHARED_QUEUE_NAME  =  "MyQueue";
typedef SynchronizedQueue<gps_position> MySynchronisedQueue;

#include <boost/interprocess/shared_memory_object.hpp>
#include <iostream>

void consumer()
{
    bip::managed_shared_memory openedSegment(bip::open_only, SHARED_MEMORY_NAME);
    
    MySynchronisedQueue *openedQueue = openedSegment.find<MySynchronisedQueue>(SHARED_QUEUE_NAME).first;
    gps_position position;

    while (openedQueue->pop(position)) {
        std::cout << "Degrees= " << position.degrees << " Minutes= " << position.minutes << " Seconds= " << position.seconds;
        std::cout << "\n";
    }
}

void producer() {
    bip::shared_memory_object::remove(SHARED_MEMORY_NAME);
    
    bip::managed_shared_memory mysegment(bip::create_only,SHARED_MEMORY_NAME, 65536);

    MySynchronisedQueue::allocator_type alloc(mysegment.get_segment_manager());
    MySynchronisedQueue *myQueue = mysegment.construct<MySynchronisedQueue>(SHARED_QUEUE_NAME)(alloc);

    for(int i = 0; i < 100; ++i)          
        myQueue->push(gps_position(i, 2, 3));

    // Wait until the queue is empty: has been processed by client(s)
    while(myQueue->sizeOfQueue() > 0) 
        continue;
}

int main() {
    producer();
    // or enable the consumer code for client:
    // consumer();
}
sehe
  • 374,641
  • 47
  • 450
  • 633
  • Thanks sehe for your response. Looking forward to a link for a solution that would work between process. For kick, I remplaced that mutex boost::interprocess::named_mutex But from boost::interprocess::named_mutex class comment, it's can't be placed in shared memory: //!A mutex with a global name, so it can be found from different //!processes. This mutex can't be placed in shared memory, and //!each process should have it's own named_mutex. – Jeff Lacoste Oct 22 '14 at 18:33
  • @JeffLacoste You do realize that this completely invalidates the question, then, right? You cannot seriously ask anything about this while not showing the relevant code (changed). And also, you're still trying to serialize that. It look just misguided. Finally, you **[_can_ put `boost::interprocess::interprocess_mutex` into shared memory](http://www.boost.org/doc/libs/1_56_0/doc/html/interprocess/synchronization_mechanisms.html#interprocess.synchronization_mechanisms.mutexes.mutexes_anonymous_example)** just fine – sehe Oct 22 '14 at 18:36
  • no i did know it. You did point me to 'interprocess_mutex, so i went and look into it and that is how i find that comment in the .hpp for named_mutex. Regarding the code changes, I did not went too far, I simply changed the mutex type and then i discovered the comment about not being shareable. I did try to compile that change variable type and it did not compile, but since i saw that comment in the .hpp, i stopped from looking into compilation errors and how to realy change the code. This is where i'm now. If you have an url, i'll take and post back result. Thanks again for your help. – Jeff Lacoste Oct 22 '14 at 18:49
  • @Jeff cheers, added two links I could find in reasonable time – sehe Oct 22 '14 at 18:58
  • Thanks again @Sehe. I updated the code and my original post with made changes to use boost::interprocess::interprocess_mutex and scoped_lock when locking. Unfortantely my consumer still crash. I also updated my post that ultimately i want to have the queue consumed from multiple child process. I went to the posted two url: first one was for one producer/one consumer and second url was for message queue. Thanks. I'm still looking for what is wrong with my new/updated code. – Jeff Lacoste Oct 22 '14 at 19:14
  • @JeffLacoste If you insist, I've fixed your code and commented on the things fixed. I'm not saying it makes the solution _better_, but at least it works (and runs cleanly under valgrind) – sehe Oct 22 '14 at 21:01
  • Thank you very much @Sehe. Your updated response and points were spot on. I did edit the queue using your recommended changes and it worked nicely. Thanks a lot. – Jeff Lacoste Oct 23 '14 at 12:14