35

I'm using python 2.7, and trying to run some CPU heavy tasks in their own processes. I would like to be able to send messages back to the parent process to keep it informed of the current status of the process. The multiprocessing Queue seems perfect for this but I can't figure out how to get it work.

So, this is my basic working example minus the use of a Queue.

import multiprocessing as mp
import time

def f(x):
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print str(results.next())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

I've tried passing the Queue in several ways, and they get the error message "RuntimeError: Queue objects should only be shared between processes through inheritance". Here is one of the ways I tried based on an earlier answer I found. (I get the same problem trying to use Pool.map_async and Pool.imap)

import multiprocessing as mp
import time

def f(args):
    x = args[0]
    q = args[1]
    q.put(str(x))
    time.sleep(0.1)
    return x*x

def main():
    q = mp.Queue()
    pool = mp.Pool()
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))

    print str(q.get())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

Finally, the 0 fitness approach (make it global) doesn't generate any messages, it just locks up.

import multiprocessing as mp
import time

q = mp.Queue()

def f(x):
    q.put(str(x))
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print q.get()

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

I'm aware that it will probably work with multiprocessing.Process directly and that there are other libraries to accomplish this, but I hate to back away from the standard library functions that are a great fit until I'm sure it's not just my lack of knowledge keeping me from being able to exploit them.

Thanks.

Daniel DiPaolo
  • 55,313
  • 14
  • 116
  • 115
Olson
  • 1,884
  • 2
  • 17
  • 18

2 Answers2

57

The trick is to pass the Queue as an argument to the initializer. Appears to work with all the Pool dispatch methods.

import multiprocessing as mp

def f(x):
    f.q.put('Doing: ' + str(x))
    return x*x

def f_init(q):
    f.q = q

def main():
    jobs = range(1,6)

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, jobs)
    p.close()

    for i in range(len(jobs)):
        print q.get()
        print results.next()

if __name__ == '__main__':
    main()
Olson
  • 1,884
  • 2
  • 17
  • 18
  • 7
    Very nice demonstration of the purpose and usefulness of the `initializer` and `initargs` arguments to `multiprocessing.Pool`! – Chris Arndt Jan 22 '12 at 20:27
  • Could you please explain, why it works? What happpens when you do f.q = q ? – kepkin Jul 12 '12 at 19:26
  • 3
    @kepkin In Python, every function is an object (See http://docs.python.org/reference/datamodel.html#the-standard-type-hierarchy Callable Types). Therefore, f.q is setting an attribute named q on the function object f. It was just a quick and lightweight way to save the Queue object for use later. – Olson Jul 17 '12 at 22:31
  • 2
    Isn't f.q = q an example of a monkey patch? http://stackoverflow.com/questions/5626193/what-is-monkey-patch – Matthew Cornell Dec 17 '13 at 17:43
  • 1
    This was able to allow me to apply the multiprocessing logging pattern (http://plumberjack.blogspot.com.au/2010/09/using-logging-with-multiprocessing.html) to async methods. – Jaxor24 Apr 23 '15 at 04:04
  • @Olson I wish to thank you for this answer. I have been fighting this for days with no avail. It now works, thanks to you. – Matt Nov 24 '20 at 01:53
  • 1
    such a dirty code. Global states. Assigning vars to a function object. – iperov Feb 27 '22 at 19:07
  • Why does it work when assigning the Queue during initialization but not when passing the Queue as an argument? – Leonardus Chen Feb 28 '22 at 10:07
1

With fork start method (i.e., on Unix platforms), you do NOT need to use that initializer trick in the top answer

Just define mp.Queue as a global variable and it will be correctly inherited by the child processes.

OP's example works fine using Python 3.9.7 on Linux (code slightly adjusted):

import multiprocessing as mp
import time

q = mp.Queue()


def f(x):
    q.put(str(x))
    return x * x


def main():
    pool = mp.Pool(5)
    pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    for _ in range(1, 6):
        print(q.get())

    pool.close()
    pool.join()


if __name__ == '__main__':
    main()

Output:

2
1
3
4
5

It's been 12 years, but I'd like to make sure any Linux user who come across this question knows the top answer's trick is only needed if you cannot use fork

Leonardus Chen
  • 1,103
  • 6
  • 20