Welcome to the Zen-of-Zero :
while we get Zero warranties, we may do a few steps here, in a due direction. If new to ZeroMQ, feel free to read a few of the posts here and at least "ZeroMQ Principles in less than Five Seconds" before diving into further details :
CLIENT-code conceptual template :
import zmq; print( zmq.zmq_version() ) # INF:
aCtx = zmq.Context( 4 ) # request 4-I/O-threads
aPUB = aCtx.socket( zmq.PUB ) # PUB-instance
aPUB.setsockopt( zmq.LINGER, 0 ) # avoid deadlock on close
aPUB.setsockopt( zmq.SNDBUF, 3 * PayLoadSIZE ) # FullHD ~ 6 MB, 4K ~ ...
aPUB.setsockopt( zmq.SNDHWM, aNumOfPicsInQUEUE ) # 1, ~3? ~10?, !1000 ...
aPUB.setsockopt( zmq.IMMEDIATE, 1 ) # ignore L1/L2-incomplete(s)
aPUB.setsockopt( zmq.CONFLATE, 1 ) # do not re-send "old"
aPUB.bind( <transport-class>:<port#> ) # tcp:? udp-multicast?
#-----------------------------------------------------------------------------[RTO]
# may like to set aPayLOAD = gzip.compress( dill.dumps( capture ), compressionLEVEL )
# yields reduced sizes of the serialised <capture> data
# at costs of about ~30~60 [ms] on either side
# which may lower the network traffic and .SNDBUF-sizing issues
#----------------------------------------------------------------------
while <any reason>:
try:
aPUB.send( aPayLOAD, zmq.NOBLOCK )
except:
# handle as per errno ...
finally:
pass
#----------------------------------------------------------------------
aPUB.close()
aCtx.term()
SERVER-code conceptual template :
import zmq; print( zmq.zmq_version() ) # INF:
aCtx = zmq.Context() # request 4-I/O-threads
aSUB = aCtx.socket( zmq.SUB ) # SUB-instance
aSUB.setsockopt( zmq.LINGER, 0 ) # avoid deadlock on close
aSUB.setsockopt( zmq.RCVBUF, 3 * PayLoadSIZE ) # FullHD ~ 6 MB, 4K ~ ...
aSUB.setsockopt( zmq.RCVHWM, aNumOfPicsInQUEUE ) # 1, ~3? ~10?, !1000 ...
aSUB.setsockopt( zmq.IMMEDIATE, 1 ) # ignore L1/L2-incomplete(s)
aSUB.setsockopt( zmq.CONFLATE, 1 ) # do not re-recv "old"
aSUB.setsockopt( zmq.SUBSCRIBE, "" ) # do subscribe to whatever comes
aSUB.connect( <transport-class>:<port#> ) # tcp:? udp-multicast?
#-----------------------------------------------------------------------------[RTO]
while <any reason>:
try:
if ( aSUB.poll( zmq.POLLIN, 0 ) == 0 ):
# nothing in the receiving Queue ready-to-.recv()
# sleep()
# do some system work etc
else:
aPayLOAD = aSUB.recv( zmq.NOBLOCK )
#--------------------------------------------------------
# decompress / deserialise the original object
# capture = dill.loads( gzip.decompress( aPayLOAD ) )
#--------------------------------------------------------
# PROCESS THE DATA :
# ...
except:
# handle as per errno ...
finally:
pass
#----------------------------------------------------------------------
aSUB.close()
aCtx.term()