5

I have a follow-up to How to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(C++)?

That question requested a C++ proxy using XSUB and XPUB. The answer given is essentially the proxy main() function quoted below.

I extended this proxy to a full working example including a publisher and subscriber. The catch is that my code only works with dealer / router options (as shown in comments below). With the actual (uncommented) XPUB / XSUB options below, subscribers don't get messages. What's going wrong? Is there a tweak to get messages to arrive?

Proxy not working with XPUB/XSUB (working dealer / router in comments)

#include <zmq.hpp>

int main(int argc, char* argv[]) {
    zmq::context_t ctx(1);
    zmq::socket_t frontend(ctx, /*ZMQ_ROUTER*/ ZMQ_XSUB);
    zmq::socket_t backend(ctx, /*ZMQ_DEALER*/ ZMQ_XPUB);
    frontend.bind("tcp://*:5570");
    backend.bind("tcp://*:5571");
    zmq::proxy(frontend, backend, nullptr);
    return 0;
}

Subscriber not working with ZMQ_SUB (working dealer / router option in comments)

#include <iostream>
#include <zmq.hpp>

std::string GetStringFromMessage(const zmq::message_t& msg) {
    char* tmp = new char[msg.size()+1];
    memcpy(tmp,msg.data(),msg.size());
    tmp[msg.size()] = '\0';
    std::string rval(tmp);
    delete[] tmp;
    return rval;
}

int main(int argc, char* argv[]) {
    zmq::context_t ctx(1);
    zmq::socket_t socket(ctx, /*ZMQ_DEALER*/ ZMQ_SUB);
    socket.connect("tcp://localhost:5571");
    while (true) {
        zmq::message_t identity;
        zmq::message_t message;
        socket.recv(&identity);
        socket.recv(&message);
        std::string identityStr(GetStringFromMessage(identity));
        std::string messageStr(GetStringFromMessage(message));
        std::cout << "Identity: " << identityStr << std::endl;
        std::cout << "Message: "  << messageStr  << std::endl;
    }
}

Publisher not working with ZMQ_PUB (working dealer / router option in comments)

#include <unistd.h>
#include <sstream>
#include <zmq.hpp>

int main (int argc, char* argv[])
{
    // Context
    zmq::context_t ctx(1);

    // Create a socket and set its identity attribute
    zmq::socket_t socket(ctx, /*ZMQ_DEALER*/ ZMQ_PUB);
    char identity[10] = {};
    sprintf(identity, "%d", getpid());
    socket.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
    socket.connect("tcp://localhost:5570");

    // Send some messages
    unsigned int counter = 0;
    while (true) {
        std::ostringstream ss;
        ss << "Message #" << counter << " from PID " << getpid();
        socket.send(ss.str().c_str(),ss.str().length());
        counter++;
        sleep(1);
    }
    return 0;
}
Community
  • 1
  • 1
Predrag3141
  • 223
  • 3
  • 9
  • Seems like a slow joiners problem. I'm running into the same issue - subscribers fail to get publisher messages unless I add a `sleep(1)` to the publisher before sending anything. I have yet to find a working solution. Seems like XPUB/XSUB is a broken construct – Gillespie Mar 30 '17 at 22:45
  • Thanks RPGillespie for looking into this. – Predrag3141 Apr 03 '17 at 15:25
  • Dealer / Router does not do what XPUB / XSUB does: each message is routed to a unique subscriber with Dealer / Router. So I had no working example sending to all subscribers at the time I asked the question. I worked around the fact that XPUB / XSUB was not working by using a ZMQ_PUSH socket to publish, and a ZMQ_PULL socket as the frontend in the proxy. – Predrag3141 Apr 03 '17 at 15:27
  • 1
    I was able to get a decent XPUB/XSUB solution working using PUSH/PULL as follows: http://stackoverflow.com/questions/43129714/zeromq-xpub-xsub-serious-flaw – Gillespie Apr 03 '17 at 18:18
  • what does "not working" mean? I was able to proxy a working PUB/SUB example in about 20 minutes (changing the publisher to have multiple publishing threads, federated through XSUB/proxy/XPUB) – jwm Apr 01 '21 at 19:13

2 Answers2

3

In subscriber code you haven't subscribed to receive messages from the publisher. Try adding the line:

socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); 

before/after the line:

socket.connect("tcp://localhost:5571");

in your Subscriber code

Dmitriy
  • 5,525
  • 12
  • 25
  • 38
Ayush
  • 880
  • 1
  • 9
  • 21
1

broker example

#include <zmq.hpp>

int main(int argc, char* argv[]) {

void* ctx = zmq_ctx_new();
assert(ctx);

void* frontend = zmq_socket(ctx, ZMQ_XSUB);
assert(frontend);
void* backend = zmq_socket(ctx, ZMQ_XPUB);
assert(backend);

int rc = zmq_bind(frontend, "tcp://*:5570");
assert(rc==0);
rc = zmq_bind(backend, "tcp://*:5571");
assert(rc==0);

zmq_proxy_steerable(frontend, backend, nullptr, nullptr);

zmq_close(frontend);
zmq_close(backend);

rc = zmq_ctx_term(ctx);
return 0;
}

pub example

#include <zmq.hpp>
#include <bits/stdc++.h>

using namespace std;
using namespace chrono;

int main(int argc, char* argv[]) 
{
void* context = zmq_ctx_new();
assert (context);
/* Create a ZMQ_SUB socket */
void *socket = zmq_socket (context, ZMQ_PUB);
assert (socket);
/* Connect it to the host 

localhost, port 5571 using a TCP transport */
int rc = zmq_connect (socket, "tcp://localhost:5570");
assert (rc == 0);

while (true) 
{
    int len = zmq_send(socket, "hello", 5, 0);
    cout << "pub len = " << len << endl;
    this_thread::sleep_for(milliseconds(1000));
}
}

sub example

#include <iostream>
#include <zmq.hpp>

using namespace std;

int main(int argc, char* argv[]) 
{
void* context = zmq_ctx_new();
assert (context);
/* Create a ZMQ_SUB socket */
void *socket = zmq_socket (context, ZMQ_SUB);
assert (socket);
/* Connect it to the host localhost, port 5571 using a TCP transport */
int rc = zmq_connect (socket, "tcp://localhost:5571");
assert (rc == 0);
rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);

while (true) 
{
    char buffer[1024] = {0};
    int len = zmq_recv(socket, buffer, sizeof(buffer), 0);
    cout << "len = " << len << endl;
    cout << "buffer = " << buffer << endl;
}
}
he shouyong
  • 159
  • 3