I have 3 processes, let's call them host
and worker1
and worker2
. I want worker1
and worker2
to be able to communicate with each other directly via PUB/SUB sockets (with host
interjecting intermittently), so I have the following setup:
# host
socket = ctx.Socket(zmq.PUB)
socket.bind('ipc:///tmp/comms')
# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.send(b'worker1')
# worker2
socket = ctx.Socket(zmq.SUB)
socket.connect('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()
As of now, this setup doesn't work. worker1
sends fine, but worker2
never seems to receive the message. However, if I now change the setup to this:
# host
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.connect(b'worker1')
# worker2
socket = ctx.Socket(zmq.SUB)
socket.bind('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()
It works just fine. However, if I also bind in host
, it stops working again.
Why is this? What happens if I now have workerN
which also needs to subscribe to worker1
, how can I bind from all the processes? What are these bind/connect semantics? Isn't host
, being the long-lived process, doing the right thing by bind
ing, and if so, why is worker2
failing to receive when it is connect
ing?
MWE: https://gist.github.com/ooblahman/f8f9724b9995b9646ebdb79d26afd68a
import zmq
import time
from multiprocessing import Process
addr = 'ipc:///tmp/test_zmq'
def worker1():
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.connect(addr)
while True:
sock.send(b'worker1')
print('worker1 sent')
time.sleep(1)
def worker2():
ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.connect(addr) # Change this to bind
sock.setsockopt(zmq.SUBSCRIBE, b'worker1')
while True:
sock.recv()
print('worker2 heard')
def main():
ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.bind(addr) # Change this to connect (or remove entirely)
p1 = Process(target=worker1)
p2 = Process(target=worker2)
p1.start()
p2.start()
p1.join()
if __name__ == '__main__':
main()