43

I want a long-running process to return its progress over a Queue (or something similar) which I will feed to a progress bar dialog. I also need the result when the process is completed. A test example here fails with a RuntimeError: Queue objects should only be shared between processes through inheritance.

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"

def main():
    q = multiprocessing.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

I've been able to get this to work using individual Process objects (where I am alowed to pass a Queue reference) but then I don't have a pool to manage the many processes I want to launch. Any advise on a better pattern for this?

Tom Burrows
  • 2,225
  • 2
  • 29
  • 46
David
  • 1,423
  • 1
  • 12
  • 9
  • It's not an answer to your question, but try the `execnet` library for multi-process mappings. The built-in `multiprocessing` has some issues still to be fixed (see the Python tracker). Besides that its source code is quite large and complicated. The `execnet` library looks much better to me than `multiprocessing`. – Andrey Vlasovskikh Jul 10 '10 at 01:26

2 Answers2

58

The following code seems to work:

import multiprocessing, time

def task(args):
    count = args[0]
    queue = args[1]
    for i in xrange(count):
        queue.put("%d mississippi" % i)
    return "Done"


def main():
    manager = multiprocessing.Manager()
    q = manager.Queue()
    pool = multiprocessing.Pool()
    result = pool.map_async(task, [(x, q) for x in range(10)])
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

Note that the Queue is got from a manager.Queue() rather than multiprocessing.Queue(). Thanks Alex for pointing me in this direction.

David
  • 1,423
  • 1
  • 12
  • 9
  • +1 and Just a quick note that your question helped me in a problem I had today. I'd found the Manager version of the queue, but my code wasn't working because I was relying on a global. It needs to be passed as a parameter, like you are doing. – winwaed Jan 30 '11 at 22:31
8

Making q global works...:

import multiprocessing, time

q = multiprocessing.Queue()

def task(count):
    for i in xrange(count):
        q.put("%d mississippi" % i)
    return "Done"

def main():
    pool = multiprocessing.Pool()
    result = pool.map_async(task, range(10))
    time.sleep(1)
    while not q.empty():
        print q.get()
    print result.get()

if __name__ == "__main__":
    main()

If you need multiple queues, e.g. to avoid mixing up the progress of the various pool processes, a global list of queues should work (of course, each process will then need to know what index in the list to use, but that's OK to pass as an argument;-).

Alex Martelli
  • 854,459
  • 170
  • 1,222
  • 1,395
  • Will this work if the "task" is defined in a different module or package? The example code is very simplified. The real program has an MVC architecture where a producer-consumer pipeline is set up across multiple cores (the model) and it needs to send progress updates to the wxPython GUI (the View). – David Jul 10 '10 at 05:20
  • 2
    @David, you can try; if your real code doesn't work in this simple way, you'll need to move up a notch in complexity and go for a Manager (which can give you proxies to Queues, etc). – Alex Martelli Jul 10 '10 at 05:32
  • This doesn't seem to work at all. q never returns anything q.empty() is always True on my machine. Even if I increase the sleep call to 10 seconds which should be excessive time for the task to put a few messages on the queue, q.empty always returns True. – David Jul 11 '10 at 04:27
  • @David, by "This", do you mean the code I posted in my A? Because that code works fine for me on a dual-core macbook with OSX 10.5, Python 2.6.5 or 2.7. What's your platform? – Alex Martelli Jul 11 '10 at 06:24
  • Yes, I copied the code you posted and while the results are returned (I get 10 "Done" in a list) nothing is ever returned by the Queue. The debugger shows that q always returns q.empty() == True. Windows 7, ActivePython 2.6.5.12 – David Jul 12 '10 at 04:27
  • @David, it does work fine on the Mac, as I said (no Windows to check there). Ah well, then it looks like a Manager is the only viable approach for you. – Alex Martelli Jul 12 '10 at 14:05
  • 1
    There is a good reason that this works on macs and not windows. The default context for creating new processes in windows is spawn, as Fork is not available. – micsthepick May 22 '18 at 22:56