2

I want to send a large string by zmq, here is the demo code:

for client:

#include "zmq.hpp"

int main(int argc, char ** argv) { 
  int port = 9443;
  zmq::context_t con(1);
  zmq::socket_t sock(con, ZMQ_SUB);
  std::string address = "tcp://address:" + std::to_string(port);  // address is the host ip
  sock.setsockopt(ZMQ_SUBSCRIBE, 0, 0);
  sock.connect(address.c_str());
  size_t size = 1024 * 32 * 1024; 
  char* buff = new char[size];
  while (1) { 
    size_t s = sock.recv(buff, size);
    printf("recv size %zu\n", s);
  } 
}

for server:

#include "zmq.hpp"

int main(int argc, char ** argv) {
  int port = 9443;
  zmq::context_t con(1);
  zmq::socket_t sock(con, ZMQ_PUB);
  std::string address = "tcp://*:" + std::to_string(port);
  sock.bind(address.c_str());
  size_t size = 1024 * 32 *1024; //  here is the problem, if the size is large, recv will fail, it works when size = 1024 * 32
  char* buff = new char[size];
  std::string sss;
  std::cin >> sss;
  std::cout << sock.send(buff, size) << std::endl;
}

the problem happens in server, if the size is 1024 * 32, it's ok to send and recv.

but when the size added to 1024 * 1024 * 32, the recv don't react.

could you help on this? is there any setting i can use to make this work?

nothingisme
  • 119
  • 5
  • https://stackoverflow.com/a/31501922/4123703 might be related. – Louis Go Apr 19 '23 at 06:33
  • When using the publish / subscribe pattern as written in your code, the subscriber may not get the first (and only in this case) message sent by the publisher because it may be sent before the subscription is active. This behavior may not be deterministic and could depend on things like message size. – RandomBits Apr 21 '23 at 15:51

1 Answers1

0

==== EDIT ====

For the pub/sub pattern, the following code works with both small and large messages.

Publisher:

#include <iostream>
#include <thread>
#include <zmq.hpp>

int main(int argc, const char *argv[]) {

    zmq::context_t context(1);
    zmq::socket_t publisher(context, zmq::socket_type::pub);
    publisher.bind("tcp://*:5556");

    for (auto i = 0; i < 10; ++i) {
        std::string data = "sequence" + std::string(32 * 1024 * 1024, ' ');
        zmq::message_t msg(data);
        std::cout << "pub: " << msg.str() << std::endl;
        auto rc = publisher.send(msg, zmq::send_flags::dontwait);
        assert(rc);
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    return 0;
}

Subscriber:

#include <iostream>
#include <zmq.hpp>

int main(int argc, const char *argv[]) {

    zmq::context_t context(1);
    zmq::socket_t subscriber(context, zmq::socket_type::sub);
    subscriber.connect("tcp://localhost:5556");
    subscriber.set(zmq::sockopt::subscribe, "sequence");
    for (auto i = 0; i < 5; ++i) {
        zmq::message_t msg;
        auto rc = subscriber.recv(msg, zmq::recv_flags::none);
        assert(rc);
        std::cout << "sub: " << msg << std::endl;
    };

    return 0;
}

Expected Output:

pub: zmq::message_t [size 33554440] (... too big to print)
pub: zmq::message_t [size 33554440] (... too big to print)
sub: zmq::message_t [size 33554440] (... too big to print)
pub: zmq::message_t [size 33554440] (... too big to print)
sub: zmq::message_t [size 33554440] (... too big to print)
pub: zmq::message_t [size 33554440] (... too big to print)
sub: zmq::message_t [size 33554440] (... too big to print)
pub: zmq::message_t [size 33554440] (... too big to print)
sub: zmq::message_t [size 33554440] (... too big to print)
pub: zmq::message_t [size 33554440] (... too big to print)
sub: zmq::message_t [size 33554440] (... too big to print)
pub: zmq::message_t [size 33554440] (... too big to print)
pub: zmq::message_t [size 33554440] (... too big to print)
pub: zmq::message_t [size 33554440] (... too big to print)
pub: zmq::message_t [size 33554440] (... too big to print)

==== End Edit ====

For the request/reply pattern, the following code works with both small and large messages.

Server:

#include <iostream>
#include <zmq.hpp>

int main(int argc, const char *argv[]) {

    zmq::context_t context(1);
    zmq::socket_t server(context, zmq::socket_type::rep);
    server.bind("tcp://*:5556");

    while (1) {
        zmq::message_t request;
        auto rc = server.recv(request);
        assert(rc);
        std::cout << "server recv: " << request.str() << std::endl;
        auto n = atoi(request.to_string().c_str());

        std::string data(n, 'x');
        zmq::message_t reply(data);
        std::cout << "server send: " << reply.str() << std::endl;
        server.send(reply, zmq::send_flags::dontwait);

        if (n == 0)
            break;
    }

    return 0;
}

Client:

#include <iostream>
#include <zmq.hpp>

int main(int argc, const char *argv[]) {

    zmq::context_t context(1);
    zmq::socket_t client(context, zmq::socket_type::req);
    client.connect ("tcp://localhost:5556");

    auto l = [&](std::string size) {
        zmq::message_t request(size);
        std::cout << "cleint send: " << request << std::endl;
        client.send(request, zmq::send_flags::none);

        zmq::message_t reply;
        auto rc = client.recv(reply, zmq::recv_flags::none);
        assert(rc);
        std::cout << "client recv: " << reply << std::endl;
    };

    l("1024");
    l("32000000");
    l("0");

    return 0;
}

Output:

client send: zmq::message_t [size 004] ( 1024)
server recv: zmq::message_t [size 004] ( 1024)
server send: zmq::message_t [size 1024] (... too big to print)
client recv: zmq::message_t [size 1024] (... too big to print)
client send: zmq::message_t [size 008] ( 32000000)
server recv: zmq::message_t [size 008] ( 32000000)
server send: zmq::message_t [size 32000000] (... too big to print)
client recv: zmq::message_t [size 32000000] (... too big to print)
client send: zmq::message_t [size 001] ( 0)
server recv: zmq::message_t [size 001] ( 0)
server send: zmq::message_t [size 000] ()
client recv: zmq::message_t [size 000] ()
RandomBits
  • 4,194
  • 1
  • 17
  • 30
  • It's good if you got your code to work. Can you explain what's different about your code and what's wrong with the asker's code, and how your updated code is different from your original code? – Gabriel Staples Apr 21 '23 at 14:02
  • 1
    The initial code uses the ZeroMQ request / reply pattern while the updated code uses the ZeroMQ publish / subscribe pattern as in the OP's question. I am unsure why OP's code does not work as I don't see an obvious problem. – RandomBits Apr 21 '23 at 14:09