3

How to create a network which allows for multiple publishers and multiple subscribers to those publishers?

Or is it absolutely necessary for a message broker to be used?

import time
import zmq
from multiprocessing import Process

def bind_pub(sleep_seconds, max_messages, pub_id):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5556")

    message = 0
    while True:
        socket.send_string("1 sending_func=bind_pub message_number=%s pub_id=%s" % (message, pub_id))
        message += 1
        if message >= max_messages:
            break
        time.sleep(sleep_seconds)

def bind_sub(sleep_seconds, max_messages, sub_id):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.bind("tcp://*:5556")
    socket.setsockopt_string(zmq.SUBSCRIBE, '1')

    message_n = 0
    while True:
        message = socket.recv_string()
        print(message + " receiving_func=bind_sub sub_id=%s" % sub_id)
        message_n += 1
        if message_n >= max_messages - 1:
            break
        time.sleep(sleep_seconds)

def conect_pub(sleep_seconds, max_messages, pub_id):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect("tcp://localhost:5556")

    message = 0
    while True:
        socket.send_string("1 sending_func=conect_pub message_number=%s pub_id=%s" % (message, pub_id))
        message += 1
        if message >= max_messages:
            break
        time.sleep(sleep_seconds)

def connect_sub(sleep_seconds, max_messages, sub_id):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5556")
    socket.setsockopt_string(zmq.SUBSCRIBE, '1')

    message_n = 0
    while True:
        message = socket.recv_string()
        print(message + " receiving_func=connect_sub sub_id=%s" % sub_id)
        message_n += 1
        if message_n >= max_messages - 1:
            break
        time.sleep(sleep_seconds)

When trying a bind_pub, connect_pub, connect_sub, connect_sub network architecture:

# bind_pub, connect_pub, connect_sub, connect_sub
n_messages = 4
p1 = Process(target=bind_pub, args=(1,n_messages,1))
p2 = Process(target=conect_pub, args=(1,n_messages,2))
p3 = Process(target=connect_sub, args=(0.1,n_messages,1))
p4 = Process(target=connect_sub, args=(0.1,n_messages,2))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()

Results in pub_id=2 messages going missing:

1 sending_func=bind_pub message_number=1 pub_id=1 receiving_func=connect_sub sub_id=2
1 sending_func=bind_pub message_number=1 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=2 pub_id=1 receiving_func=connect_sub sub_id=2
1 sending_func=bind_pub message_number=2 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=3 pub_id=1 receiving_func=connect_sub sub_id=1
1 sending_func=bind_pub message_number=3 pub_id=1 receiving_func=connect_sub sub_id=2

Similarly running a connect_pub, connect_pub, connect_sub, bind_sub architecture:

# connect_pub, connect_pub, connect_sub, bind_sub
n_messages = 4
p1 = Process(target=conect_pub, args=(1,n_messages,1))
p2 = Process(target=conect_pub, args=(1,n_messages,2))
p3 = Process(target=bind_sub, args=(0.1,n_messages,1))
p4 = Process(target=connect_sub, args=(0.1,n_messages,2))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()

Results in no messages being received by sub_id=2:

1 sending_func=conect_pub message_number=1 pub_id=1 receiving_func=bind_sub sub_id=1
1 sending_func=conect_pub message_number=1 pub_id=2 receiving_func=bind_sub sub_id=1
1 sending_func=conect_pub message_number=2 pub_id=1 receiving_func=bind_sub sub_id=1
Greg
  • 8,175
  • 16
  • 72
  • 125
  • 1
    so.. I dont get the accepted answer.. reading it is like eating broken glass.. obv the guy has IQ 3233.. he is just in a parallel universe.. I have `subscriber` socket. how do i connect it to multiple publishers? – Boppity Bop Apr 09 '21 at 14:14

2 Answers2

1

Well,
fair to mention that ZeroMQ is principally a Broker-less framework ,

this means the 2nd question is solved a priori - no, it is not only not absolutely necessary, it is also principally impossible ( if one does not implement a Broker-(semi-)persistence as a Zen-of-Zero standard ZeroMQ tools based layer an extra add-on ).


Next,
ZeroMQ tools are by far not "socket"-s as you know 'em :

This is an often re-articulated misconception, so let me repeat it in bold.

Beware:
ZeroMQ Socket()-instance is not a tcp-socket-as-you-know-it. Best read about the main conceptual differences in ZeroMQ hierarchy in less than a five seconds or other posts and discussions here.


Yet,
more important,
there seems to be no expressed need which is not covered :

ZeroMQ can either serve all of :

many-PUB-s : many-SUB-s           -or-  
 one-PUB   : many-SUB-s           -or- even  
many-PUB-s :  one-SUB

where all or part of those "many" could still get .connect()-ed to a single or more AccessPoints, so the produced topologies could go indeed wild ( for details kindly check the above offered link to a "five seconds" read ) so, one's own imagination seems to be the only ceiling in doing this.

For performance and latency envelopes, feel free to seek and read more in other posts.

user3666197
  • 1
  • 6
  • 50
  • 92
  • 1
    Could you elaborate how `many-PUB-s : many-SUB-s` architectures are made? That is, what type of connection the PUB / SUB side use `{ PUB/SUB | PUSH/PULL | PAIR/PAIR | XPUB/XSUB | ... | REQ/REP }` and how the `connect` or `bind` methods fit into it? – Greg Apr 12 '18 at 12:37
  • How can `many-PUB-s : many-SUB-s` be achieved if only one publisher or consumer is able to bind? – Greg Apr 18 '18 at 01:08
  • Imagine, each **`SUB`** undertakes many respective `.connect( PUB_1 )`, next `.connect( PUB_2 )`, next `.connect( PUB_3 )`, next `.connect( PUB_4 )` as long as feasible and achievable across the set of already `.bind( ... )`-**RTO**'d `PUB`-s. Thus also the `many-PUB-s : many-SUB-s` topologies work, still using the native `PUB/SUB` or `XPUB/XSUB` archetypes, as and where needed. – user3666197 Apr 18 '18 at 01:52
  • 1
    I provided a code examples of a many pub many sub architecture as well as the results. It does not seem to be possible. – Greg Apr 19 '18 at 02:14
  • Negative, Sir. You might notice that your added code does not establish the wished to have `many-PUB-s : many-SUB-s` as the `Socket()`-instance cannot meaningfully **`PUB.connect( another_PUB )`**, this simply will not fly. Feel free to re-design the infrastructure so as to meet the documented ZeroMQ principles & the `many-PUB-s : many-SUB-s` topology will start to work. – user3666197 Apr 19 '18 at 09:54
  • 3
    You're joking right? I cannot face palm myself hard enough in reaction to your response. In my original question I asked how it's possible to do and whether or not a message broker needs to be used. You said that it is possible. I went ahead and showed you code which are the only combinations which had a chance of working and not using a message broker, and now you tell me to use the patterns in the documentation. This is a joke, I don't know how you gained such a high reputation. And I have flagged your answer. Genuinely this is a joke. – Greg Apr 19 '18 at 10:59
1

It is certainly not necessary to use a broker in order to implement a many-to-many network, but a broker does simplify configuration since each node only needs to know the broker's address, not all of its peers.

Another possibility is a hybrid approach -- using a broker to exchange address information among peers so they can connect to each other directly. You can find an example here: https://github.com/nyfix/OZ/blob/master/doc/Naming-Service.md

WallStProg
  • 391
  • 4
  • 8
  • Totally agree with this, I have worked on a very similar hybrid solution. The best part is the data connection between sender and receiver are single hop and low latency, while the discovery service nodes/proxies can be duplicated behind a load balancer. More recently I have deployed the system on aws and putting the discovery parts in a scale set with load balancer makes for a very reliable system. – James Harvey May 12 '21 at 09:42