I'm trying to use the pyzmq PUB/SUB
socket-archetypes inside a realm of multiprocessing.Process
:
I have one subscriber:
import time
import collections
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.connect("tcp://localhost:5000")
nb_recv = 0
begin = time.time()
counter = collections.defaultdict(int)
while True:
msg = socket.recv_json()
print(msg)
And two different implementations of a publisher.
With this one, the subscriber receives the messages:
import zmq
from multiprocessing import Process
class Sender(object):
def __init__(self):
self._context = zmq.Context()
pass
def run(self):
self._socket = self._context.socket(zmq.PUB)
self._socket.bind("tcp://127.0.0.1:5000")
seq_num = 0
while True:
msg = { "sequence": seq_num }
self._socket.send_json(msg)
seq_num += 1
if __name__ == "__main__":
s = Sender()
p = Process(target=s.run)
p.start()
p.join()
But with this one ( where the only difference is that the creation of the socket
is in the constructor, instead of being in the run()
class-method ), the subscriber does not receive any message:
import zmq
from multiprocessing import Process
class Sender(object):
def __init__(self):
self._context = zmq.Context()
self._socket = self._context.socket(zmq.PUB) # <---------
pass
def run(self):
self._socket.bind("tcp://127.0.0.1:5000")
seq_num = 0
while True:
msg = { "sequence": seq_num }
self._socket.send_json(msg)
seq_num += 1
if __name__ == "__main__":
s = Sender()
p = Process(target=s.run)
p.start()
p.join()
When I replace multiprocessing.Process
by a threading.Thread
, both classes work well, but I did not find any explanation on the documentation.