0

I'm trying to use the pyzmq PUB/SUB socket-archetypes inside a realm of multiprocessing.Process:

I have one subscriber:

import time
import collections    
import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.connect("tcp://localhost:5000")

nb_recv = 0
begin = time.time()
counter = collections.defaultdict(int)
while True:
    msg = socket.recv_json()
    print(msg)

And two different implementations of a publisher.

With this one, the subscriber receives the messages:

import zmq
from multiprocessing import Process

class Sender(object):

    def __init__(self):
        self._context = zmq.Context()
        pass

    def run(self):
        self._socket = self._context.socket(zmq.PUB)
        self._socket.bind("tcp://127.0.0.1:5000")
        seq_num = 0
        while True:
            msg = { "sequence": seq_num }
            self._socket.send_json(msg)
            seq_num += 1

if __name__ == "__main__":
    s = Sender()
    p = Process(target=s.run)
    p.start()
    p.join()

But with this one ( where the only difference is that the creation of the socket is in the constructor, instead of being in the run() class-method ), the subscriber does not receive any message:

import zmq
from multiprocessing import Process

class Sender(object):

    def __init__(self):
        self._context = zmq.Context()
        self._socket = self._context.socket(zmq.PUB) # <---------
        pass

    def run(self):
        self._socket.bind("tcp://127.0.0.1:5000")
        seq_num = 0
        while True:
            msg = { "sequence": seq_num }
            self._socket.send_json(msg)
            seq_num += 1

if __name__ == "__main__":
    s = Sender()
    p = Process(target=s.run)
    p.start()
    p.join()

When I replace multiprocessing.Process by a threading.Thread, both classes work well, but I did not find any explanation on the documentation.

user3666197
  • 1
  • 6
  • 50
  • 92

1 Answers1

0

You're creating the object in one process and trying to execute a method of it in another.

It is possible to create or even share objects between multiprocessing processes with multiprocessing.Manager but since this object holds a non-shareable resource (a network socket), you're better off creating it inside the worker process unless you wish to walk the minefield of pickling and using an object with non-picklable fields.

ivan_pozdeev
  • 33,874
  • 19
  • 107
  • 152