1

Model:

Where A in theory 'acts' like a server, where D and E are both a subscriber and a publisher and F is a subscriber.

An example would be highly appreciated, if it indeed is feasible.

If not, please provide an alternative. Websockets are not an option in my goal.

user3666197
  • 1
  • 6
  • 50
  • 92
Bondeaux
  • 174
  • 1
  • 3
  • 10

1 Answers1

1

Certainly can:

Best
first read about the main conceptual differences in the [ ZeroMQ hierarchy in less than a five seconds ] Section.

For a simplicity, let's assume just a tcp:// transport-class and nodes having individual IP-addresses ( easy to transform to any co-located case and/or different transport-class mix ).

Performance tweaking is a must for any larger volumes and/or strict latency management cases.

Node-A:

import time, zmq; aCTX = zmq.Context(); aPUB = aCTX.socket( zmq.PUB )
pass;                                   aPUB.bind( "tcp:10.0.0.1:12345" )
pass;                                   aPUB.setsockopt( zmq.CONFLATE, 1 )
pass;                                   print( "A: Started. Can Ctrl+C." )
while True:
      try:
           aPUB.send( "A: sending ...[{0:}]".format( time.ctime() ); time.sleep( 1 )
      except KeyboardInterrupt:
           pass;                        print( "A: Ctrl+C'd. Will terminate" ); break
pass;
aPUB.close()
aCTX.term()

Node(s)-B(,C):

import time, zmq; aCTX = zmq.Context(); aPUB = aCTX.socket( zmq.PUB )
pass;                                   aPUB.bind( "tcp:10.0.0.10:23456" )
pass;                                   aPUB.setsockopt( zmq.CONFLATE, 1 )
pass;                                   aSUB = aCTX.socket( zmq.SUB )
pass;                                   aSUB.connect( "tcp://10.0.0.1:12345" )
pass;                                   aSUB.setsockopt( zmq.LINGER, 0 )
pass;                                   aSUB.setsockopt( zmq.SUBSCRIBE, "A:" )
pass;                                   print( "B: Started. Can Ctrl+C." )
while True:
      try:
           aPUB.send( "B: sending ...[{0:}]".format( time.ctime() );
           if ( 0 != aSUB.poll(  500, zmq.POLLIN ) ):
               print( "B:recv()'d: {0:} at {1:}".format( aSUB.recv( zmq.NOBLOCK ), time.ctime() )

      except KeyboardInterrupt:
           pass;                        print( "B: Ctrl+C'd. Will terminate" ); break
pass;
aPUB.close()
aSUB.close()
aCTX.term()

Node(s)-D(,E):

import time, zmq; aCTX = zmq.Context(); aPUB = aCTX.socket( zmq.PUB )
pass;                                   aPUB.bind( "tcp:10.0.0.100:34567" )
pass;                                   aPUB.setsockopt( zmq.CONFLATE, 1 )
pass;                                   aSUB = aCTX.socket( zmq.SUB )
pass;                                   aSUB.connect( "tcp://10.0.0.10:23456" )
pass;                                   aSUB.connect( "tcp://10.0.0.20:23456" )
pass;                                   aSUB.setsockopt( zmq.LINGER, 0 )
pass;                                   aSUB.setsockopt( zmq.SUBSCRIBE, "B:" )
pass;                                   aSUB.setsockopt( zmq.SUBSCRIBE, "C:" )
pass;                                   print( "D: Started. Can Ctrl+C." )
while True:
      try:
           aPUB.send( "D: sending ...[{0:}]".format( time.ctime() )
           if ( 0 != aSUB.poll(  250, zmq.POLLIN ) ):
               print( "D:recv()'d: {0:} at {1:}".format( aSUB.recv( zmq.NOBLOCK ), time.ctime() )
      except KeyboardInterrupt:
           pass;                        print( "D: Ctrl+C'd. Will terminate" ); break
pass;
aPUB.close()
aSUB.close()
aCTX.term()

Node-F:

import time, zmq; aCTX = zmq.Context(); aSUB = aCTX.socket( zmq.SUB )
pass;                                   aSUB.connect( "tcp://10.0.0.100:34567" )
pass;                                   aSUB.connect( "tcp://10.0.0.200:34567" )
pass;                                   aSUB.setsockopt( zmq.LINGER, 0 )
pass;                                   aSUB.setsockopt( zmq.SUBSCRIBE, "D:" )
pass;                                   aSUB.setsockopt( zmq.SUBSCRIBE, "E:" )
pass;                                   print( "F: Started. Can Ctrl+C." )
while True:
      try:
           print( "F:recv()'d: {0:} at {1:}".format( aSUB.recv(), time.ctime() )
      except KeyboardInterrupt:
           pass;                        print( "F: Ctrl+C'd. Will terminate" ); break
pass;
aSUB.close()
aCTX.term()
halfer
  • 19,824
  • 17
  • 99
  • 186
user3666197
  • 1
  • 6
  • 50
  • 92