4

ZeroMQ ( version - zeromq-4.1.6 ) PGM multicast packet receive stuck in between, even Sender still sending the packets without any issue.

If we restart the Receiver, application now receives the packets, but it won't be a solution. I tried with various ZMQ_RATE in both Sender & Receiver side.

Issue:

Sender sends almost 300,000 packets with following socket options, but Receiver stuck in between & not receiving all the packets. If we add the Sleep( 2 ) - waiting for 2 ms in each sending, sometimes we receive all the packets, but its taking more time.

Environment Setup:

( Sender & Receiver connected within the single subnet using D-Link switch. Media speed is 1Gbps )

Sender: JZMQ ( ZMQ C library, openPGM )
ZMQ_RATE - 30Mbps ( Megabits per second )
Packet size - 1024 bytes
ZMQ_RECOVERY_IVL - 2 Minutes
Send Flag - 0 ( blocking mode )
Sleep( 2ms ) - sometimes its working without any issue but taking more time for transfer.
Platform - Windows

Receiver: ZMQ C++ ( ZMQ C library, openPGM )
ZMQ_RATE - 30Mbps ( Megabits per second )
ZMQ_RCVTIMEO - 3 Secs
receive Flag - 0 ( blocking mode )
Platform - Windows

What can be the issue?

Is ZeroMQ PGM-multicast not a stable library?

JZMQ Sender:
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(ZMQ.PUB);
socket.setRate(80000);
socket.setRecoveryInterval(60*60);
socket.setSendTimeOut(-1);
socket.setSendBufferSize(1024*64);
socket.bind("pgm://local_IP;239.255.0.20:30001");

byte[] bytesToSend = new byte[1024];
int count = 0;
while(count < 300000) {
    socket.send(bytesToSend, 0);
    count++;
}

------------------------------------------------
// ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <stdio.h>
#include "zmq.hpp"


int main(int argc, char* argv[]) {
    try {

         zmq::context_t context(1);

      // Socket to talk to server
         printf ("Connecting to server...");

         zmq::socket_t *s1 = new zmq::socket_t(context, ZMQ_SUB);

         int recvTimeout = 3000;
         s1->setsockopt(ZMQ_RCVTIMEO,&recvTimeout,sizeof(int));

         int recvRate = 80000;
         s1->setsockopt(ZMQ_RATE,&recvRate,sizeof(int));

         int recsec = 60 * 60;
      // s1->setsockopt(ZMQ_RECOVERY_IVL,&recsec,sizeof(recsec));

         s1->connect("pgm://local_IP;239.255.0.20:30001");

         s1->setsockopt (ZMQ_SUBSCRIBE, NULL, 0);

         printf ("done. \n");
         int seq=0;
         while(true) {

               zmq::message_t msgbuff;

               int ret = s1->recv(&msgbuff,0);
               if(!ret)
               {
                   printf ("Received not received timeout\n");
                   continue;
               }

               printf ("Seq(%d) Received data size=%d\n",seq,msgbuff.size());
               ++seq;
         }
    }
    catch( zmq::error_t &e )   {
           printf ("An error occurred: %s\n", e.what());
           return 1;
    }
    catch( std::exception &e ) {
           printf ("An error occurred: %s\n", e.what());
           return 1;
    }
    return 0;
}
my2117
  • 143
  • 1
  • 6
  • And where is the MCVE-code to replicate and diagnose the root-cause? Or do you expect to receive an email with a HiRes X-ray picture, just from telling your doctor over a phone 5 Lakh times, that your left hand seems to have been broken once riding a bicycle last weekend. :o) No, the world does not work this way. – user3666197 Jun 13 '17 at 18:58
  • @user3666197 Hi added zmq pgm multicast sender & receiver code snippet – my2117 Jun 14 '17 at 07:25

1 Answers1

0

Is PGM stable?
FYI: working since v 2.1.1 and today we have stable 4.2.+

It is not a good practice & I would dare to accuse library maintainers not to have thoroughly tested PGM/EPGM before releasing a library or to do a poor job in development anytime before an application design was well understood, robustly designed and well diagnosed & performance- / latency-tested on an actual deployment ecosystem's reality-check, typically consisting of
{ localhost | home-subnet | remote-network(s) | remote-host(s) }.


[PUB]-sending part needs to get due care:

If nothing else, this part of the documentation is warning and ringing all bells & blowing all whistles, if inadequate resources management takes place in a few mock-up SLOCs, whereas a due care is indeed in place for brutal attempts to send in a non-blocking, super-fast loop:

ØMQ does not guarantee that the socket will accept as many as ZMQ_SNDHWM messages, and the actual limit may be as much as 60-70% lower depending on the flow of messages on the socket.

So, might be right your [PUB]-sender drops the missing messages, before these ever get down onto the wire.

Next warning comes from the O/S privileges:

The pgm transport implementation requires access to raw IP sockets. Additional privileges may be required on some operating systems for this operation. Applications not requiring direct interoperability with other PGM implementations are encouraged to use the epgm transport instead which does not require any special privileges.


Next comes the [SUB]-receiver:

some more tuning would help to sniff the [PUB]-sender similarly to the below proposed inlined status / tracing tools for the [SUB]-receiver:

------------------------------------------------
// ZMQCPP-PGM-receive.cpp : Defines the entry point for the console application.
//                          MODs: https://stackoverflow.com/q/44526517/3666197

#include "stdafx.h"
#include <stdio.h>
#include "zmq.hpp"

#include <chrono>                                                       // since C++ 11
typedef std::chrono::high_resolution_clock              nanoCLK;

#define ZMQ_IO_THREAD_POOL_SIZE                         8

#define ZMQ_AFINITY_PLAIN_ROUNDROBIN_UNMANAGED_RISKY    0
#define ZMQ_AFINITY_LO_PRIO_POOL                        0 | 1
#define ZMQ_AFINITY_HI_PRIO_POOL                        0 | 0 | 2
#define ZMQ_AFINITY_MC_EPGM_POOL                        0 | 0 | 0 | 4 | 8 | 0 | 0 | 64 | 128


int main( int argc, char* argv[] ) {

    auto RECV_start = nanoCLK::now();
    auto RECV_ret   = nanoCLK::now();
    auto RECV_last  = nanoCLK::now();
    auto TEST_start = nanoCLK::now();

    try {
           zmq::context_t context( ZMQ_IO_THREAD_POOL_SIZE );           printf ( "Connecting to server..." );
           int            major,  minor,  patch;
           zmq::version( &major, &minor, &patch );                      printf ( "Using ZeroMQ( %d.%d.%d )", major, minor, patch );

           zmq::socket_t *s1 = new zmq::socket_t( context, ZMQ_SUB );   // Socket to talk to server

           int zmqLinger   =       0,          // [  ms]
               zmqAffinity =       0,          // [   #]  mapper bitmap-onto-IO-thread-Pool (ref. #define-s above )

               recvBuffer  =       2 * 123456, // [   B]
               recvMaxSize =    9876,          // [   B]
               recvHwMark  =  123456,          // [   #]  max number of MSGs allowed to be Queued per connected Peer

               recvRate    =   80000 * 10,     // [kbps]
               recvTimeout =    3000,          // [  ms]  before ret EAGAIN { 0: NO_BLOCK | -1: INF | N: wait [ms] }
               recoverMSEC =      60 * 60      // [  ms]
               ;

           s1->setsockopt ( ZMQ_AFFINITY,     &zmqAffinity, sizeof(int) );
           s1->setsockopt ( ZMQ_LINGER,       &zmqLinger,   sizeof(int) );
           s1->setsockopt ( ZMQ_MAXMSGSIZE,   &recvMaxSize, sizeof(int) );
           s1->setsockopt ( ZMQ_RCVBUF,       &recvBuffer,  sizeof(int) );
           s1->setsockopt ( ZMQ_RCVHWM,       &recvHwMark,  sizeof(int) );
           s1->setsockopt ( ZMQ_RCVTIMEO,     &recvTimeout, sizeof(int) );
           s1->setsockopt ( ZMQ_RATE,         &recvRate,    sizeof(int) );
     //    s1->setsockopt ( ZMQ_RECOVERY_IVL, &recoverMSEC, sizeof(int) );

           s1->connect ( "pgm://local_IP;239.255.0.20:30001" );
           s1->setsockopt ( ZMQ_SUBSCRIBE, NULL, 0 );                   printf ( "done. \n" );

           int seq = 0;
           while( true ) {
                  zmq::message_t         msgbuff;                  RECV_start = nanoCLK::now(); RECV_last = RECV_ret;
                  int   ret = s1->recv( &msgbuff, 0 );             RECV_ret   = nanoCLK::now();
                  if ( !ret )                                           printf ( "[T0+ %14d [ns]]: [SUB] did not receive any message within set timeout(%d). RC == %d LOOP_ovhd == %6d [ns] RECV_wait == %10d [ns]\n", std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(),           recvTimeout, ret, std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_last ).count(), std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_start ).count() );
                  else                                                  printf ( "[T0+ %14d [ns]]: [SUB] did now receive   a message SEQ#(%6d.) DATA[%6d] B. RC == %d LOOP_ovhd == %6d [ns] RECV_wait == %10d [ns]\n", std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(), ++seq, msgbuff.size(), ret, std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_last ).count(), std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - RECV_start ).count() );
           }
    }
    catch( zmq::error_t   &e ) {                                        printf ( "[T0+ %14d [ns]]: [EXC.ZMQ] An error occurred: %s\nWill RET(1)", std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(), e.what() );
           return 1;
    }
    catch( std::exception &e ) {                                        printf ( "[T0+ %14d [ns]]: [EXC.std] An error occurred: %s\nWill RET(1)", std::chrono::duration_cast<std::chrono::nanoseconds>( RECV_ret - TEST_start ).count(), e.what() );
           return 1;
    }
    return 0;
}
user3666197
  • 1
  • 6
  • 50
  • 92