1

I am new to ZeroMQ.
I have multiple publishers and one client. Seeking suggestions to implement it in a best way.
Currently its making use of a reply - request pattern for a single client and a server; this has to be extended to multiple publishers and a single subscriber.

This application is going to run on a QNX-system that does not support C11, so zmq::multipart_t is not helping.

void TransportLayer::Init()
{
    socket.bind( "tcp://*:5555" );
}

void TransportLayer::Receive()
{
    while ( true ) {
        zmq::message_t request;
        string protoBuf;
        socket.recv( &request );

        uint16_t id = *( (uint16_t*)request.data() );
        protoBuf = std::string( static_cast<char*>( request.data()
                                                  + sizeof( uint16_t )
                                                    ),
                                request.size() - sizeof( uint16_t )
                                );
        InterfaceLayer::getInstance()->ParseProtoBufTable( protoBuf );
    }
    Send();
    usleep( 1 );
}

void TransportLayer::Send()
{
    zmq::message_t reply( 1 );
    memcpy( reply.data(), "#", 1 );

    socket.send( reply );
}

This is the code that I had written, this was initially designed to listen to only one client, now I have to extend it to listen to multiple clients.

I tried using zmq::multipart_t but this requires C11 support but the QNX-version we are using does not support C11.


I tried implementing the proposed solution.
I created 2 publishers connecting to same static location.

Observation :

I )
Execution Order :
1. Started Subscriber
2. Started Publisher1 ( it published only one data value )

Subscriber missed to receive this data.

II )
modified Publisher1 to send the same data in a while loop
Execution Order :
1. Started Subscriber
2. Started Publisher1
3. Started Publsiher2.

Now I see that the Subscriber is receiving the data from both publishers.

This gives me an indication that there is a possibility for data loss.

How do I ensure there is absolutely no data loss?


Here is my source code :

Publisher 2 :

dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) {

}
void dummyFrontEnd::Init()
{
    socket.connect("tcp://127.0.0.1:5555");
    cout << "Connecting .... " << endl;
}
void dummyFrontEnd::SendData() {

while ( std::getline(file, line_str) ) {

        std::stringstream ss(line_str);
        std::string direction;

        double tdiff;
        int    i, _1939, pgn, priority, source, length, data[8];
        char   J, p, _0, dash, d;

        ss >> tdiff >> i >> J >> _1939 >> pgn >> p >> priority >> _0 >> source
           >> dash >> direction >> d >> length >> data[0] >> data[1] >> data[2]
           >> data[3] >> data[4] >> data[5] >> data[6] >> data[7];

        timestamp += tdiff;

        while (            gcl_get_time_ms() - start_time <
                uint64_t(timestamp * 1000.0) - first_time ) { usleep(1); }

        if (arguments.verbose) {
            std::cout << timestamp << " " << i << " " << J << " " << _1939 << " "
                << pgn << " " << p << " " << priority << " " << _0 << " " << source
                << " " << dash << " " << direction << " " << d << " " << length
                << " " << data[0] << " " << data[1] << " " << data[2] << " "
                << data[3] << " " << data[4] << " " << data[5] << " " << data[6]
                << " " << data[7] << std::endl;
        }

        uint64_t timestamp_ms = (uint64_t)(timestamp * 1000.0);

        protoTable.add_columnvalues(uint64ToString(timestamp_ms)); /* timestamp */
        protoTable.add_columnvalues(intToString(pgn));             /* PGN       */
        protoTable.add_columnvalues(intToString(priority));        /* Priority  */
        protoTable.add_columnvalues(intToString(source));          /* Source    */
        protoTable.add_columnvalues(direction);                    /* Direction */
        protoTable.add_columnvalues(intToString(length));          /* Length    */
        protoTable.add_columnvalues(intToString(data[0]));         /* data1     */
        protoTable.add_columnvalues(intToString(data[1]));         /* data2     */
        protoTable.add_columnvalues(intToString(data[2]));         /* data3     */
        protoTable.add_columnvalues(intToString(data[3]));         /* data4     */
        protoTable.add_columnvalues(intToString(data[4]));         /* data5     */
        protoTable.add_columnvalues(intToString(data[5]));         /* data6     */
        protoTable.add_columnvalues(intToString(data[6]));         /* data7     */
        protoTable.add_columnvalues(intToString(data[7]));         /* data8     */

    zmq::message_t create_values(protoTable.ByteSizeLong()+sizeof(uint16_t));
        *((uint16_t*)create_values.data()) = TABLEMSG_ID;  // ID
        protoTable.SerializeToArray(create_values.data()+sizeof(uint16_t), protoTable.ByteSizeLong());

        socket.send(create_values);

        protoTable.clear_columnvalues();
        usleep(1);
    }

}

Publisher 1 :

dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) {

}
void dummyFrontEnd::Init()
{
    socket.connect("tcp://127.0.0.1:5555");
    cout << "Connecting .... " << endl;
}
void dummyFrontEnd::SendData()
{
   cout << "In SendData" << endl;

   while(1) { 
       canlogreq canLogObj = canlogreq::default_instance();
                 canLogObj.set_fromhours(11);
                 canLogObj.set_fromminutes(7);
                 canLogObj.set_fromseconds(2);
                 canLogObj.set_fromday(16);
                 canLogObj.set_frommonth(5);
                 canLogObj.set_fromyear(2020);
                 canLogObj.set_tohours(12);
                 canLogObj.set_tominutes(7);
                 canLogObj.set_toseconds(4);
                 canLogObj.set_today(17);
                 canLogObj.set_tomonth(5);
                 canLogObj.set_toyear(2020);

       zmq::message_t logsnippetmsg(canLogObj.ByteSizeLong() + sizeof(uint16_t));

       *((uint16_t*)logsnippetmsg.data()) = 20;

       canLogObj.SerializeToArray(logsnippetmsg.data()+sizeof(uint16_t), canLogObj.ByteSizeLong());

       socket.send(logsnippetmsg);

       usleep(1);

       canLogObj.clear_fromhours();
       canLogObj.clear_fromminutes();
       canLogObj.clear_fromseconds();
       canLogObj.clear_fromday();
       canLogObj.clear_frommonth();
       canLogObj.clear_fromyear();
       canLogObj.clear_tohours();
       canLogObj.clear_tominutes();
       canLogObj.clear_toseconds();
       canLogObj.clear_today();
       canLogObj.clear_tomonth();
       canLogObj.clear_toyear();
   }

}

Subscriber :

TransportLayer::TransportLayer():context(1),socket(context,ZMQ_SUB){ }
void TransportLayer::Init()
{
    socket.bind("tcp://*:5555"); 
    socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
}
void TransportLayer::Receive()
{
    cout << "TransportLayer::Receive " << " I am in server " << endl;

    static int count = 1;
    // Producer thread.
    while ( true ){

        zmq::message_t request;
        string protoBuf;

        socket.recv(&request);

        uint16_t id = *((uint16_t*)request.data());

        cout << "TransportLayer : " << "request.data:  " << request.data() << endl;
        cout << "TransportLayer : count " << count << endl; count = count + 1;
        cout << "TransportLayer : request.data.size " << request.size() << endl;

        protoBuf = std::string(static_cast<char*>(request.data() + sizeof(uint16_t)), request.size() - sizeof(uint16_t));

        cout << "ProtoBuf : " << protoBuf << endl;

        InterfaceLayer *interfaceLayObj = InterfaceLayer::getInstance();

        switch(id) {
            case TABLEMSG_ID:   cout << "Canlyser" << endl;
                                interfaceLayObj->ParseProtoBufTable(protoBuf);
                                break; 
            case LOGSNIPPET_ID: cout << "LogSnip" << endl;
                                interfaceLayObj->ParseProtoBufLogSnippet(protoBuf);
                                interfaceLayObj->logsnippetSignal(); // publish the signal
                                break;
            default:            break;
        }

        usleep(1);

    }

}
user3666197
  • 1
  • 6
  • 50
  • 92
user6868820
  • 75
  • 1
  • 1
  • 7
  • 1
    Questions asking for links are off-topic because Stack Overflow is _not_ a human-powered search engine. Please clarify your question: what code have you written and what issues are you experiencing? – ForceBru May 14 '20 at 14:16
  • http://zguide.zeromq.org/page:all#Divide-and-Conquer seems what's your looking for – Mathieu May 14 '20 at 14:24
  • @ForceBru I have added additional information – user6868820 May 14 '20 at 14:50
  • @Mathieu thanks for that . It seems promising . I shall play around for a bit and see if its suitable . – user6868820 May 14 '20 at 14:51

1 Answers1

-1

Q : "how to use multiple Publishers and a single Client, using C < C11?"

So, the QNX-version was not explicitly stated, so let's work in general.

As noted in ZeroMQ Principles in less than Five Seconds, the single Client ( being of a SUB-Archetype ) may zmq_connect( ? ), however at a cost of managing some, unknown for me, way how all the other, current plus any future PUB-s were let to zmq_bind(), after which to let somehow let the SUB learn where to zmq_connect( ? ), so that to get some news from the newly bound PUB-peer.

So it would be a way smarter to make the single SUB-agent to perform a zmq_bind() and let any of the current or future PUB-s perform zmq_connect() as they come, directed to the single, static, known SUB's location ( this does not say, they cannot use any of the available transport-classes - one inproc://, another one tcp://, some ipc://, if QNX permits & system architecture requires to do so ( and, obviously, supposing the SUB-agent has exposed a properly configured AccessNode for receiving such connections ).

Next, your SUB-Client has to configure its subscription filtering topic-list: be it an order to "Do Receive EVERYTHING!" :

...
retCode = zmq_setsockopt( <aSubSocketINSTANCE>, ZMQ_SUBSCRIBE, "", 0 );
assert( retCode == 0 && "FAILED: at ZMQ_SUBSCRIBE order " );
...

Given this works, your next duty is to make the setup robust enough ( an explicit ZMQ_LINGER setting to 0, access-policies, security, scaled-resources, L2/L3-network protective measures, etc ).

And you are done to harness the ZeroMQ just fit right to your QNX-system design needs.

halfer
  • 19,824
  • 17
  • 99
  • 186
user3666197
  • 1
  • 6
  • 50
  • 92
  • Thanks for the reply . using C++ < C++11. – user6868820 May 16 '20 at 08:18
  • 1
    qnx version is 6.5.0 – user6868820 May 16 '20 at 08:30
  • I clicked the upvote now.stackoverflow popped up this message. "Thanks for the feedback! Votes cast by those with less than 15 reputation are recorded, but do not change the publicly displayed post score." – user6868820 May 17 '20 at 11:11
  • I have edited my original question with more details related to data loss. Any input on that is of a great value . Thanks – user6868820 May 17 '20 at 11:26
  • 1
    Great your new code works. The new problems shall not creep the original question. Message delivery is a complex subject on its own - feel free to open another Question with these new aspects added. – user3666197 May 17 '20 at 12:32
  • I have opened anther question . Thanks https://stackoverflow.com/questions/61852070/zmq-multiple-publisher-and-single-subscriber-data-loss-observed – user6868820 May 17 '20 at 12:51