122

The documentation for the multiprocessing module shows how to pass a queue to a process started with multiprocessing.Process. But how can I share a queue with asynchronous worker processes started with apply_async? I don't need dynamic joining or anything else, just a way for the workers to (repeatedly) report their results back to base.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))

This fails with: RuntimeError: Queue objects should only be shared between processes through inheritance. I understand what this means, and I understand the advice to inherit rather than require pickling/unpickling (and all the special Windows restrictions). But how do I pass the queue in a way that works? I can't find an example, and I've tried several alternatives that failed in various ways. Help please?

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
alexis
  • 48,685
  • 16
  • 101
  • 161
  • For future readers that want to share a queue with workers without using a manager and sacrificing performance, this answer might be worth checking out to create a picklable queue instead https://stackoverflow.com/a/75247561/16310741 – Charchit Agarwal Feb 19 '23 at 14:48
  • @charchit, thanks! No offense, though, but your solution in the linked answer looks complicated: Lots of code, long discussion in the comments, ending with a "disclaimer" from the OP. For somebody actually coding an e-commerce site with zillions of requests, is there really is a _drastic_ performance improvement with your solution that would justify all this effort? The 6-liner in the accepted answer here is about as simple as they get. – alexis Feb 20 '23 at 07:52
  • Relatively speaking, the performance hit of introducing managers in your code will be noticeable. Firstly, when using `Manager.Queue()`, everything is pickled/unpickled twice instead of once with a normal queue (once to send to/from manager process and another to retrieve/put object on queue). Secondly, every method call on a managed object takes 1000x more time to resolve before the method is called. These become major bottlenecks on performance-sensitive apps, but may not actually be that bad (1/2). – Charchit Agarwal Feb 20 '23 at 11:25
  • This is because, speaking in absolute terms, even though name resolution is 1000 times slower on a managed object, it still only introduces a delay of 0.001s, and even though the double pickling/unpickling may become a severe bottleneck for code that regularly puts big items on a queue, it's effect might actually be negligible if all you're putting on a queue is small strings and text. So whether or not it is worth the extra hassle to make picklable queues depends on the technicalities of your use case, but for most cases, it probably is better to just use `Manager.Queue()` (2/2). – Charchit Agarwal Feb 20 '23 at 11:26
  • Thanks for the carefully qualified explanation. "Relatively speaking" the execution time could go up by a factor of 10, and I still wouldn't care as long as my task completes in a few seconds... or even minutes, considering the complexity of the alternative (ok, perceived complexity -- your solution is scary :-D). In short, I don't mean to be contrary but if I actually needed this, I'd hold out for actual benchmark tests before making a decision either way. – alexis Feb 20 '23 at 15:05

2 Answers2

165

Try using multiprocessing.Manager to manage your queue and to also make it accessible to different workers.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = pool.apply_async(worker, (33, q))
alexis
  • 48,685
  • 16
  • 101
  • 161
enderskill
  • 7,354
  • 3
  • 24
  • 23
  • That did it, thanks! There was an unrelated problem with the async call in my original code, so I copied the fix to your answer too. – alexis Mar 30 '12 at 21:15
  • 26
    Any explanation why `queue.Queue()` is not suitable for this? – mrgloom Jul 08 '19 at 17:31
  • 6
    @mrgloom: `queue.Queue` was built for threading, using in-memory locks. In a Multiprocess environment, each subprocess would get it's own copy of a `queue.Queue()` instance in their own memory space, since subprocesses don't share memory (mostly). – LeoRochael May 13 '20 at 14:26
  • 1
    @alexis How to get the elements from the Manager().Queue() after multiple workers have inserted data into it? – MSS Jul 21 '20 at 13:48
  • [Multiprocessing.Queue.get()](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Queue.get) – alexis Sep 24 '20 at 16:08
  • @LeoRochael, this does not explain, why those `queue.Queue` is still usable with multiprocess env, when "using inheritance", I mean Process and not Pool – g.pickardou Jun 02 '22 at 06:31
  • If by "using inheritance" you mean that you get an instance of a `queue.Queue` after a fork, it is "usable" only in the sense that it doesn't cause errors, but due to the Copy-on-Write semantics of shared memory in the S.O. you're not actually sharing the `queue.Queue` instance with the parent process, even though it seems to work. You just have an independent copy of `queue.Queue` on each process, whether parent or children. – LeoRochael Nov 10 '22 at 14:56
26

multiprocessing.Pool already has a shared result-queue, there is no need to additionally involve a Manager.Queue. Manager.Queue is a queue.Queue (multithreading-queue) under the hood, located on a separate server-process and exposed via proxies. This adds additional overhead compared to Pool's internal queue. Contrary to relying on Pool's native result-handling, the results in the Manager.Queue also are not guaranteed to be ordered.

The worker processes are not started with .apply_async(), this already happens when you instantiate Pool. What is started when you call pool.apply_async() is a new "job". Pool's worker-processes run the multiprocessing.pool.worker-function under the hood. This function takes care of processing new "tasks" transferred over Pool's internal Pool._inqueue and of sending results back to the parent over the Pool._outqueue. Your specified func will be executed within multiprocessing.pool.worker. func only has to return something and the result will be automatically send back to the parent.

.apply_async() immediately (asynchronously) returns a AsyncResult object (alias for ApplyResult). You need to call .get() (is blocking) on that object to receive the actual result. Another option would be to register a callback function, which gets fired as soon as the result becomes ready.

from multiprocessing import Pool

def busy_foo(i):
    """Dummy function simulating cpu-bound work."""
    for _ in range(int(10e6)):  # do stuff
        pass
    return i

if __name__ == '__main__':

    with Pool(4) as pool:
        print(pool._outqueue)  # DEMO
        results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
        # `.apply_async()` immediately returns AsyncResult (ApplyResult) object
        print(results[0])  # DEMO
        results = [res.get() for res in results]
        print(f'result: {results}')       

Example Output:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Note: Specifying the timeout-parameter for .get() will not stop the actual processing of the task within the worker, it only unblocks the waiting parent by raising a multiprocessing.TimeoutError.

Darkonaut
  • 20,186
  • 7
  • 54
  • 65
  • Interesting, I'll try it out first chance I get. It certainly didn't work this way in 2012. – alexis Apr 08 '19 at 18:12
  • @alexis Python 2.7 (2010) relevantly here is only missing the context manager and the `error_callback`-parameter for `apply_async`, so it didn't change much since. – Darkonaut Apr 08 '19 at 18:26
  • I found the callback function to be the most useful, especially when combined with a partial function to allow for using a regular list to collect async results as described here; https://gist.github.com/Glench/5789879 – user5359531 Oct 06 '20 at 00:32
  • This doesn't work if you want a stream of data from a child worker, not a single result. – user48956 May 24 '22 at 21:46