2

I am really frustrated. Why doesn't Python's multiprocessing.apply_async() actually START the process when a queue object is passed as an argument or a part of an argument?

This code works as expected:

#! /usr/bin/env python3

import multiprocessing
import queue
import time

def worker(var):
    while True:
        print("Worker {}".format(var))
        time.sleep(2)

pool = multiprocessing.Pool(20)
m = multiprocessing.Manager()
q = queue.Queue()

for i in range(20):
    pool.apply_async(worker, (i,))

print("kicked off workers")
pool.close()
pool.join()

But just by passing queue q, nothing happens when you run it now:

#! /usr/bin/env python3

import multiprocessing
import queue
import time

def worker(var,q):
    while True:
        print("Worker {}".format(var))
        time.sleep(2)

pool = multiprocessing.Pool(20)
m = multiprocessing.Manager()
q = queue.Queue()

for i in range(20):
    pool.apply_async(worker, (i,q))

print("kicked off workers")
pool.close()
pool.join()

Again; super frustrating. What the hell is going on? What am I doing wrong?

martineau
  • 119,623
  • 25
  • 170
  • 301
Locane
  • 2,886
  • 2
  • 24
  • 35

1 Answers1

1

When you want to share a Queue between processes, you have to create a proxy for one with multiprocessing.managers.SyncManager.Queue.

import multiprocessing
import time


def worker(var, q):
    while True:
        print("Worker {}".format(var))
        time.sleep(2)


if __name__ == '__main__':  # Be sure to include this.

    pool = multiprocessing.Pool(20)
    mgr = multiprocessing.Manager()
    q = mgr.Queue()  # Create a shared queue.Queue object.

    for i in range(20):
        pool.apply_async(worker, (i,q))

    print("kicked off workers")
    pool.close()

    print('joining pool')
    pool.join()

    print('done')
martineau
  • 119,623
  • 25
  • 170
  • 301
  • Thanks @martineau, but in my real code, using a manager runs me in to this bug: https://bugs.python.org/issue30256?@ok_message=msg%20352960%20created%0Aissue%2030256%20nosy_count%2C%20nosy%2C%20messages%2C%20message_count%20edited%20ok&@template=item – Locane Sep 22 '19 at 08:11
  • Michael: Are any of the suggestions in the accepted answer to [multiprocessing.Manager nested shared objects doesn't work with Queue](https://stackoverflow.com/questions/56716470/multiprocessing-manager-nested-shared-objects-doesnt-work-with-queue) feasible? Also, even though it may not matter on your OS, be sure to add the `if __name__ == '__main__':` guard to your code if you don't have one. – martineau Sep 22 '19 at 13:36
  • Perhaps you should post a new question that reproduces the problem ([mre]) and see if someone can devise a workaround. Be sure to specify exactly what version of Python you're using, too, since that seems to bear on the problem. – martineau Sep 22 '19 at 13:44
  • Also, deriving your own `Manager` subclass to handle `Queue`s probably isn't too hard — see for example [this answer](https://stackoverflow.com/a/55189205/355230) — so doing that might be a workaround. – martineau Sep 22 '19 at 13:53
  • I managed to get my code working using the managed queue by simplifying the object that was being passed to subprocesses. Now, however, there's only 40 or so of the 115 processes I'm trying to create actually running. I get the same problem with my sample code above if I set the pool size to 200. Should I open a new question? – Locane Sep 22 '19 at 17:45
  • It seems that using the Process() object directly in a list instead of letting Pool() handle it lets me get all 200 processes going. – Locane Sep 22 '19 at 17:51
  • Frankly, using a `Pool` for a bunch of processes running infinite loops doesn't seem like a good fit for one, so if rolling-your-own works, I would go with that. Note that running 200 concurrent processes seems like it would put a tremendous load on the OS and CPU — so you might want to try to think a better way to accomplish whatever it is you're doing. – martineau Sep 22 '19 at 18:21
  • I appreciate your expert advice Martineau. Why is a pool not a good fit for running infinite loops in sub processes? – Locane Sep 23 '19 at 01:48
  • I think they're better for a worker function that does some processing and terminates, perhaps putting the result in a `Queue`. The pattern I often see is a relatively small `Pool` size being handed a large amount of asynchronous tasks to work on, with the `Pool` making sure that the specified maximum number of tasks are utilized without exceeding some specified upper-limit on the number of them running at once — because too many can actually slow things down. – martineau Sep 23 '19 at 07:01