Environment: NVIDIA-flavored Ubuntu 18.01 on their Jetson development board with their TX2i processor. ZMQ 4.3.2, utilizing the cppzmq C++ wrapper for ZMQ.
I've got a slew of code running using google protocol buffers with ZeroMQ, and it's all PUSH/PULL, and it works fine except I've got one case that isn't point-to-point, but 1:3. The correct solution here is to do PUB/SUB, but I cannot get messages through to my subscriber.
I shaved my code down to this simple example. If I uncomment the #define
statements, the subscriber gets nothing. Commented (which compiles as PUSH/PULL instead of PUB/SUB), then the subscriber gets the message as expected. With the excessive sleep_for()
times, I would expect the subscriber has ample time to be registered before the publisher performs the send.
EDIT:
Why the try/catch on the subscriber? I was getting an exception early on, and believed it was because the publisher wasn't ready. This no longer appears to be the case, so it wasn't what I thought it was.
// Publisher
#include "/usr/local/include/zmq.hpp"
#include "protobuf_namespace.pb.h"
#include <chrono>
#include <thread>
#define PUB_SUB
int main( void )
{
zmq::context_t* m_pContext = new zmq::context_t( 1 );
#ifdef PUB_SUB
zmq::socket_t* m_pSocket = new zmq::socket_t( *m_pContext, ZMQ_PUB );
#else
zmq::socket_t* m_pSocket = new zmq::socket_t( *m_pContext, ZMQ_PUSH );
#endif
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
//m_pSocket->bind( "tcp://*:53001" ); // using '*' or specific IP doesn't change result
m_pSocket->bind( "tcp://127.0.0.1:53001" );
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
// Send the parameters
protobuf_namespace::Params params;
params.set_calibrationdata( protobuf_namespace::CalDataType::CAL_REQUESTED ); // init one value to non-zero
std::string params_str = params.SerializeAsString();
zmq::message_t zmsg( params_str.size() );
memcpy( zmsg.data(), params_str.c_str(), params_str.size() );
m_pSocket->send( zmsg, zmq::send_flags::none );
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
m_pSocket->close();
zmq_ctx_destroy( m_pContext );
}
// Subscriber - start me first!
#include "/usr/local/include/zmq.hpp"
#include "protobuf_namespace.pb.h"
#include <chrono>
#include <thread>
#include <stdio.h>
#define PUB_SUB
int main( void )
{
zmq::context_t* m_pContext = new zmq::context_t( 1 );
#ifdef PUB_SUB
zmq::socket_t* m_pSocket = new zmq::socket_t( *m_pContext, ZMQ_SUB );
m_pSocket->connect( "tcp://127.0.0.1:53001" );
int linger = 0;
zmq_setsockopt( m_pSocket, ZMQ_LINGER, &linger, sizeof( linger ) );
zmq_setsockopt( m_pSocket, ZMQ_SUBSCRIBE, "", 0 );
#else
zmq::socket_t* m_pSocket = new zmq::socket_t( *m_pContext, ZMQ_PULL );
m_pSocket->connect( "tcp://127.0.0.1:53001" );
#endif
protobuf_namespace::Params params;
zmq::message_t zmsg;
bool retry = true;
do {
try {
m_pSocket->recv( zmsg, zmq::recv_flags::none );
retry = false;
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
} catch( ... ) {
printf("caught\n");
}
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
} while( retry );
std::string param_str( static_cast<char*>( zmsg.data() ), zmsg.size() );
params.ParseFromString( param_str );
if( params.calibrationdata() == protobuf_namespace::CalDataType::CAL_REQUESTED )
printf( "CAL_REQUESTED\n" );
else
printf( "bad data\n" );
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
m_pSocket->close();
zmq_ctx_destroy( m_pContext );
}