3

I'm actually finding it hard to believe that I've run into the issue I have, it seems like it would be a big bug in the python multiprocessing module... Anyways the problem I've run into is that whenever I pass a multiprocessing.Queue to a multiprocessing.Pool worker as an argument the pool worker never executes its code. I've been able to reproduce this bug even on a very simple test that is a slightly modified version of example code found in the python docs.

Here is the original version of the example code for Queues:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])


if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())  # prints "[42, None, 'hello']"
    p.join()

Here is my modified version of the example code for Queues:

from multiprocessing import Queue, Pool

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Pool(1)
    p.apply_async(f,args=(q,))
    print(q.get()) # prints "[42, None, 'hello']"
    p.close()
    p.join()

All I've done is make p a process pool of size 1 instead of a multiprocessing.Process object and the result is that the code hangs on the print statement forever because nothing was ever written to the Queue! Of course I tested this in its original form and it works fine. My OS is windows 10 and my python version is 3.5.x, anyone have any idea why this is happening?

Update: Still no idea why this example code works with a multiprocessing.Process and not a multiprocessing.Pool but I found a work around I'm content with (Alex Martelli's answer). Apparently you can just make a global list of multiprocessing.Queues and pass each process and index to use, I'm going to avoid using a managed queue because they are slower. Thanks Guest for showing me the link.

profPlum
  • 403
  • 4
  • 12
  • 1
    You may want to take a look at [#1](https://stackoverflow.com/a/30039159/3767239), [#2](https://stackoverflow.com/q/3217002/3767239), [#3](https://stackoverflow.com/q/9908781/3767239), [#4](https://stackoverflow.com/a/42659752/3767239), [#5](https://stackoverflow.com/a/25558333/3767239). It seems the reason is that the `Queue` instance cannot be pickled. However I don't understand why this deadlocks the underlying in-queue of the process. Using `Ctrl+C` with pool size of 2 revealed that it was stuck at `task = inqueue.get()` where it would request the target function. That's a bit puzzling. – a_guest Jul 18 '17 at 23:59
  • Note that with asynchronous programming you don't need to manually deal with result queues - `apply_async` returns a [`AsyncResult`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult) instance which can be used to get the result: `result.get()`. This uses an underlying result (out-) queue and so you simply need to `return` in your target function. Also if you use `result.get()` and you passed a `Queue` instance as an argument to the target function it will raise a `RuntimeError`. However I'm curious why this doesn't happen for your example. – a_guest Jul 19 '17 at 00:04
  • See my comment to your answer. My goal isn't a "result queue" this is just a trivial example. I need a queue that is continually written to and processed. – profPlum Jul 19 '17 at 15:09

1 Answers1

2

Problem

When you call apply_async it returns a AsyncResult object and leaves the workload distribution to a separate thread (see also this answer). This thread encounters the problem that the Queue object can't be pickled and therefore the requested work can't be distributed (and eventually executed). We can see this by calling AsyncResult.get:

r = p.apply_async(f,args=(q,))
r.get()

which raises a RuntimeError:

RuntimeError: Queue objects should only be shared between processes through inheritance

However this RuntimeError is only raised in the main thread once you request the result because it actually occurred in a different thread (and thus needs a way to be transmitted).

So what happens when you do

p.apply_async(f,args=(q,))

is that the target function f is never invoked because one of it's arguments (q) can't be pickled. Therefore q never receives an item and remains empty and for that reason the call to q.get in the main thread blocks forever.

Solution

With apply_async you don't have to manage the result queues manually but they are readily provided to you in form of AsyncResult objects. So you can modify the code to simply return from the target function:

from multiprocessing import Queue, Pool

def f():
    return [42, None, 'hello']

if __name__ == '__main__':
    q = Queue()
    p = Pool(1)
    result = p.apply_async(f)
    print(result.get())
a_guest
  • 34,165
  • 12
  • 64
  • 118
  • Interesting but I don't see how the code works when you just use a multiprocessing.Process instead of a multiprocessing.Pool they both create new processes so wouldn't the Queue need to be pickled for both methods? Also using the AsyncResult workaround isn't really viable for me because I need a bunch of worker Processes that are continually writing to a Queue that is then read and processed by another worker Process. – profPlum Jul 19 '17 at 15:06
  • @profPlum For those questions I'd like to refer to [this answer](https://stackoverflow.com/a/45184127/3767239). The essence is that the `Pool` starts the processes right away while the `Process` gets started after it received the `Queue` instance (no pickling required here, the process is not yet running). You can use [`Manager`](https://docs.python.org/3/library/multiprocessing.html#managers)s to share objects between processes or use the `initializer` and `initargs` keyword arguments of [`Pool`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool). – a_guest Jul 19 '17 at 21:21
  • ah ok not really sure why the fact that the process is already running requires pickling but with that explanation and info about the initializer and initargs I'll accept your answer – profPlum Jul 21 '17 at 01:45
  • @profPlum It's due to the way that information is shared between processes. On Unix-like systems they use sockets for that purpose and in order to send a (Python) object through a socket it gets pickled. The `Process` instance in Python will spawn off a system process when started, so prior to this you can share Python objects as usual with that instance. But for sharing with a running process they have to use the OS' way for inter-process communication. – a_guest Jul 21 '17 at 08:40
  • I see, thanks for the more detailed explanation makes more sense now – profPlum Jul 21 '17 at 18:01