13

I have few basic questions when it comes to using Python's multiprocessing module :

class Someparallelworkerclass(object) :

    def __init__(self):
       self.num_workers = 4
       self.work_queue = multiprocessing.JoinableQueue()
       self.result_queue = multiprocessing.JoinableQueue()

    def someparallellazymethod(self):
       p = multiprocessing.Process(target=self.worktobedone).start()

    def worktobedone(self):
      # get data from work_queue
      # put back result in result queue

Is it necessary to pass work_queue and result_queue as args to Process? Does the answer depends on the OS? The more fundamental question is: does the child process get a copied (COW) address space from the parent process, and hence knows the definition of the class/class method? If yes, how does it know that the queues are to be shared for IPC, and that it shouldn't make duplicates of the work_queue and result_queue in the child process? I tried searching this online but most of the documentation I found was vague, and didn't go into enough details as what exactly is happening underneath.

dano
  • 91,354
  • 19
  • 222
  • 219
user179156
  • 841
  • 9
  • 31

3 Answers3

10

It's actually not necessary to include the queues in the args argument in this case, no matter what platform you're using. The reason is that even though it doesn't look like you're explicitly passing the two JoinableQueue instances to the child, you actually are - via self. Because self is explicitly being passed to the child, and the two queues are a part of self, they end up being passed along to the child.

On Linux, this happens via os.fork(), which means that file descriptors used by the multiprocessing.connection.Connection objects that the Queue uses internally for inter-process communication are inherited by the child (not copied). Other parts of the Queue become copy-on-write, but that's ok; multiprocessing.Queue is designed so that none of the pieces that need to be copied actually need to stay in sync between the two processes. In fact, many of the internal attributes get reset after the fork occurs:

def _after_fork(self):
    debug('Queue._after_fork()')
    self._notempty = threading.Condition(threading.Lock())
    self._buffer = collections.deque()
    self._thread = None
    self._jointhread = None
    self._joincancelled = False
    self._closed = False
    self._close = None
    self._send = self._writer.send  # _writer is a 
    self._recv = self._reader.recv
    self._poll = self._reader.poll

So that covers Linux. How about Windows? Windows doesn't have fork, so it will need to pickle self to send it to the child, and that includes pickling our Queues. Now, normally if you try to pickle a multiprocessing.Queue, it fails:

>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> import pickle
>>> pickle.dumps(q)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python2.7/copy_reg.py", line 84, in _reduce_ex
    dict = getstate()
  File "/usr/local/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
    assert_spawning(self)
  File "/usr/local/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
    ' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

But this is actually an artificial limitation. multiprocessing.Queue objects can be pickled in some cases - how else could they be sent to child processes in Windows? And indeed, we can see that if we look at the implementation:

def __getstate__(self):
    assert_spawning(self)
    return (self._maxsize, self._reader, self._writer,
            self._rlock, self._wlock, self._sem, self._opid)

def __setstate__(self, state):
    (self._maxsize, self._reader, self._writer,
     self._rlock, self._wlock, self._sem, self._opid) = state
    self._after_fork()

__getstate__, which is called when pickling an instance, has an assert_spawning call in it, which makes sure we're actually spawning a process while attempting the pickle*. __setstate__, which is called while unpickling, is responsible for calling _after_fork.

So how are the Connection objects used by the queues maintained when we have to pickle? It turns out there's a multiprocessing sub-module that does exactly that - multiprocessing.reduction. The comment at the top of the module states it pretty clearly:

#
# Module to allow connection and socket objects to be transferred
# between processes
#

On Windows, the module ultimately uses the DuplicateHandle API provided by Windows to create a duplicate handle that the child process' Connection object can use. So while each process gets its own handle, they're exact duplicates - any action made on one is reflected on the other:

The duplicate handle refers to the same object as the original handle. Therefore, any changes to the object are reflected through both handles. For example, if you duplicate a file handle, the current file position is always the same for both handles.

* See this answer for more information about assert_spawning

Community
  • 1
  • 1
dano
  • 91,354
  • 19
  • 222
  • 219
1

The child process doesn't have the queues in its closure. It's instances of the queues reference different areas of memory. When using queues the way you intend you must pass them as args to the function. one solution I like is to use functools.partial to curry your functions with the queues you want, adding them permanently to its closure and letting you spin up multiple threads to perform the same task with the same IPC channel.

ragingSloth
  • 1,094
  • 8
  • 22
-1

The child process does not get a copied address space. The child is a completely separate python process with nothing shared. Yes, you have to pass the queues to the child. When you do so, multiprocessing automatically handles the sharing via IPC. See https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes.

Matt
  • 1,424
  • 10
  • 15
  • This isn't quite true. On Linux, the child process is forked from the parent, so it actually does get a copy-on-write address space from the parent. And in fact, you'll be able to `put` to the queue in the child and `get` the result from the parent without explicitly passing the `Queue` to the child on both Linux *and* Windows. The only cases it doesn't seem to work are Python 3.4+ with Linux using the `'spawn'` or `'forkserver'` contexts. – dano Oct 07 '14 at 15:25
  • Actually, I take back that last sentence. That was caused by an error on my part. You can always pass the Queue objects implicitly, no matter the context/platform. – dano Oct 07 '14 at 16:44