0

I have a Raspberry Pi client and 4 Raspberry Pi servers. I want the client to send a string message to all 4 servers simultaneously to capture image. Right now I am using something like the following in sequential order.

socket.send(capture)
socket1.send(capture)
socket2.send(capture)
socket3.send(capture)

Would changing to something like publish/subscribe model improve how close the clients receive the message? I would like to have the 4 clients get the capture message within 5ms or less of each other.

user3666197
  • 1
  • 6
  • 50
  • 92
Lightsout
  • 3,454
  • 2
  • 36
  • 65
  • You can use the PUB/SUB pattern. Your client would bind to an IP:PORT and your servers would connect to the client IP:PORT and set an empty string for the filter (pls check the documentation) The client will then PUBLISH a string. This message is sent to all subscribed servers simultaneously. Once the servers receive the message they can start the capture process. – jschiavon Sep 06 '19 at 22:05

1 Answers1

1

Welcome to the Zen-of-Zero :

while we get Zero warranties, we may do a few steps here, in a due direction. If new to ZeroMQ, feel free to read a few of the posts here and at least "ZeroMQ Principles in less than Five Seconds" before diving into further details :

CLIENT-code conceptual template :

   import zmq;                 print( zmq.zmq_version() ) # INF:

   aCtx = zmq.Context( 4 )                                # request 4-I/O-threads
   aPUB = aCtx.socket( zmq.PUB )                          # PUB-instance
   aPUB.setsockopt(    zmq.LINGER,   0 )                  # avoid deadlock on close
   aPUB.setsockopt(    zmq.SNDBUF, 3 * PayLoadSIZE )      # FullHD ~ 6 MB, 4K ~ ...
   aPUB.setsockopt(    zmq.SNDHWM, aNumOfPicsInQUEUE )    # 1, ~3? ~10?, !1000 ...
   aPUB.setsockopt(    zmq.IMMEDIATE, 1 )                 # ignore L1/L2-incomplete(s)
   aPUB.setsockopt(    zmq.CONFLATE, 1 )                  # do not re-send "old"
   aPUB.bind( <transport-class>:<port#> )                 # tcp:? udp-multicast?
   #-----------------------------------------------------------------------------[RTO]
   # may like to set aPayLOAD = gzip.compress( dill.dumps( capture ), compressionLEVEL )
   #                 yields reduced sizes of the serialised <capture> data
   #                 at costs of about ~30~60 [ms] on either side
   #                 which may lower the network traffic and .SNDBUF-sizing issues
   #----------------------------------------------------------------------
   while <any reason>:
         try:
              aPUB.send( aPayLOAD, zmq.NOBLOCK )
         except:
              # handle as per errno ...
         finally:
              pass
   #----------------------------------------------------------------------
   aPUB.close()
   aCtx.term()

SERVER-code conceptual template :

   import zmq;                 print( zmq.zmq_version() ) # INF:

   aCtx = zmq.Context()                                   # request 4-I/O-threads
   aSUB = aCtx.socket( zmq.SUB )                          # SUB-instance
   aSUB.setsockopt(    zmq.LINGER,   0 )                  # avoid deadlock on close
   aSUB.setsockopt(    zmq.RCVBUF, 3 * PayLoadSIZE )      # FullHD ~ 6 MB, 4K ~ ...
   aSUB.setsockopt(    zmq.RCVHWM, aNumOfPicsInQUEUE )    # 1, ~3? ~10?, !1000 ...
   aSUB.setsockopt(    zmq.IMMEDIATE, 1 )                 # ignore L1/L2-incomplete(s)
   aSUB.setsockopt(    zmq.CONFLATE, 1 )                  # do not re-recv "old"
   aSUB.setsockopt(    zmq.SUBSCRIBE, "" )                # do subscribe to whatever comes
   aSUB.connect( <transport-class>:<port#> )              # tcp:? udp-multicast?
   #-----------------------------------------------------------------------------[RTO]
   while <any reason>:
         try:
              if ( aSUB.poll( zmq.POLLIN, 0 ) == 0 ):
                 # nothing in the receiving Queue ready-to-.recv()
                 # sleep()
                 # do some system work etc
              else:
                 aPayLOAD = aSUB.recv( zmq.NOBLOCK )
                 #--------------------------------------------------------
                 # decompress / deserialise the original object
                 # capture = dill.loads( gzip.decompress( aPayLOAD ) )
                 #--------------------------------------------------------
                 # PROCESS THE DATA :
                 # ...
         except: 
              # handle as per errno ...
         finally:
              pass
   #----------------------------------------------------------------------
   aSUB.close()
   aCtx.term()
user3666197
  • 1
  • 6
  • 50
  • 92
  • 1
    What does the `zmq.COMPLETE` option do? – Benyamin Jafari Oct 13 '19 at 06:13
  • 1
    @BenyaminJafari Mea Culpa - the code was typed in a hurry and the idea was faster than my typing skills - the proper option setting to use is the **`zmq.IMMEDIATE`** module constant, which prevents putting the message-payload(s) into queue(s) for L1/L2-incomplete connections ( ZeroMQ ZMTP/RFC-documents are clear on how rich the state-full connection-management is ) thus reasonably avoiding any latency on steps, that are not making sense and may complicate reaching the `<= 5ms` delivery target. Once more, excuse my error. – user3666197 Oct 13 '19 at 07:10