6

I am having some problems with inter process communication in ZMQ between several instances of a program

  • I am using Linux OS
  • I am using zeromq/cppzmq, header-only C++ binding for libzmq

If I run two instances of this application (say on a terminal), I provide one with an argument to be a listener, then providing the other with an argument to be a sender. The listener never receives a message. I have tried TCP and IPC to no avail.

#include <zmq.hpp>
#include <string>
#include <iostream>

int ListenMessage();
int SendMessage(std::string str);

zmq::context_t global_zmq_context(1);

int main(int argc, char* argv[] ) {
    std::string str = "Hello World";
    if (atoi(argv[1]) == 0) ListenMessage();
    else SendMessage(str);

    zmq_ctx_destroy(& global_zmq_context);
    return 0;
}


int SendMessage(std::string str) {
    assert(global_zmq_context);
    std::cout << "Sending \n";
    zmq::socket_t publisher(global_zmq_context, ZMQ_PUB);
    assert(publisher);

    int linger = 0;
    int rc = zmq_setsockopt(publisher, ZMQ_LINGER, &linger, sizeof(linger));
    assert(rc==0);

    rc = zmq_connect(publisher, "tcp://127.0.0.1:4506");
    if (rc == -1) {
        printf ("E: connect failed: %s\n", strerror (errno));
        return -1;
    }

    zmq::message_t message(static_cast<const void*> (str.data()), str.size());
    rc = publisher.send(message);
    if (rc == -1) {
        printf ("E: send failed: %s\n", strerror (errno));
        return -1;
    }
    return 0;
}

int ListenMessage() {
    assert(global_zmq_context);
    std::cout << "Listening \n";
    zmq::socket_t subscriber(global_zmq_context, ZMQ_SUB);
    assert(subscriber);

    int rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
    assert(rc==0);

    int linger = 0;
    rc = zmq_setsockopt(subscriber, ZMQ_LINGER, &linger, sizeof(linger));
    assert(rc==0);

    rc = zmq_bind(subscriber, "tcp://127.0.0.1:4506");
    if (rc == -1) {
        printf ("E: bind failed: %s\n", strerror (errno));
        return -1;
    }

    std::vector<zmq::pollitem_t> p = {{subscriber, 0, ZMQ_POLLIN, 0}};
    while (true) {
        zmq::message_t rx_msg;
        // when timeout (the third argument here) is -1,
        // then block until ready to receive
        std::cout << "Still Listening before poll \n";
        zmq::poll(p.data(), 1, -1);
        std::cout << "Found an item \n"; // not reaching
        if (p[0].revents & ZMQ_POLLIN) {
            // received something on the first (only) socket
            subscriber.recv(&rx_msg);
            std::string rx_str;
            rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size());
            std::cout << "Received: " << rx_str << std::endl;
        }
    }
    return 0;
}

This code will work if I running one instance of the program with two threads

    std::thread t_sub(ListenMessage);
    sleep(1); // Slow joiner in ZMQ PUB/SUB pattern
    std::thread t_pub(SendMessage str);
    t_pub.join();
    t_sub.join();

But I am wondering why when running two instances of the program the code above won't work?

Thanks for your help!

user3666197
  • 1
  • 6
  • 50
  • 92
Alain Daccache
  • 125
  • 1
  • 12
  • 1
    One difference is the shared global context. What if each thread gets its own? What if you don't allocate it globally? – Ulrich Eckhardt Feb 07 '20 at 21:12
  • Exactly, I was worried that each instance of the program is making its own copy of that global context – Alain Daccache Feb 07 '20 at 21:21
  • I believe the context will be shared when running two threads, which is why the code above works in that case, but when running two different instances of the program, it won't because of that global context perhaps. I have no idea how to work around it – Alain Daccache Feb 07 '20 at 21:24

1 Answers1

3

In case one has never worked with ZeroMQ,
one may here enjoy to first look at "ZeroMQ Principles in less than Five Seconds"
before diving into further details


Q : wondering why when running two instances of the program the code above won't work?

This code will never fly - and it has nothing to do with thread-based nor the process-based [CONCURENT] processing.

It was caused by a wrong design of the Inter Process Communication.

ZeroMQ can provide for this either one of the supported transport-classes :
{ ipc:// | tipc:// | tcp:// | norm:// | pgm:// | epgm:// | vmci:// } plus having even smarter one for in-process comms, an inproc:// transport-class ready for inter-thread comms, where a stack-less communication may enjoy the lowest ever latency, being just a memory-mapped policy.

The selection of L3/L2-based networking stack for an Inter-Process-Communication is possible, yet sort of the most "expensive" option.


The Core Mistake :

Given that choice, any single processes ( not speaking about a pair of processes ) will collide on an attempt to .bind() its AccessPoint onto the very same TCP/IP-address:port#


The Other Mistake :

Even for the sake of a solo programme launched, both of the spawned threads attempt to .bind() its private AccessPoint, yet none does an attempt to .connect() a matching "opposite" AccessPoint.

At least one has to successfully .bind(), and
at least one has to successfully .connect(), so as to get a "channel", here of the PUB/SUB Archetype.


ToDo:

  • decide about a proper, right-enough Transport-Class ( best avoid an overkill to operate the full L3/L2-stack for localhost/in-process IPC )
  • refactor the Address:port# management ( for 2+ processes not to fail on .bind()-(s) to the same ( hard-wired ) address:port#
  • always detect and handle appropriately the returned {PASS|FAIL}-s from API calls
  • always set LINGER to zero explicitly ( you never know )
user3666197
  • 1
  • 6
  • 50
  • 92
  • First, thanks a lot for your clear answer and reasoning. Correct, I have tried all combinations of connects and binds (usually bind is for listening and connect for broadcasting), but this still hasn't solved the issue. Also, I've edited the code to detect the exceptions from API calls (now reflected in the code above) as well as added linger and changed the tcp transport – Alain Daccache Feb 08 '20 at 17:35
  • NB: I've tried linger on the binding socket, the connection socket, and both. Still isn't working :/ – Alain Daccache Feb 08 '20 at 17:44
  • LINGER is a defensive step ( saves your O/S resources from infinite hangups in case of un-coordinated .close() or .term() or even crash ). Next, there is no principal difference in which side .bind()-s or .connect()-s. **What error-codes do you receive from API** on NACK-s? Last week I remember someone here, who has observed an error in C++ wrapper/binding and started to use the pure C-lang API interfacing - did you read it or tested the same ( root cause isolation ) steps? – user3666197 Feb 08 '20 at 18:42
  • Agreed that messing with LINGER can lead to other weird behaviour - it's been added for a reason and setting to 0 defeats it. Instead it's best to explicitly use shutdown() in your code, and let linger backstop you as it's designed to do in case of communications failure. – Andrew Atrens Feb 25 '20 at 16:20