I dislike the idea of bypassing Pool's internal queues by overlaying it with external queues. IMO it leads to much more moving parts
and unnecessary complexity and you easily end up creating hard to detect race-conditions. Pool
alone is fairly complex under the hood and bloating the amount of code that runs, by even more piping on top is something I'd rather seek to avoid (KISS). It's using Pool only for it's side-effect of managing worker processes and if at all, I'd only consider it for one-off code, not for a system build for stability or possibly evolving needs.
To give you some comparison for the complexity argument...Pool
employs three worker-threads only to manage workers and funnel data back and forth. Together with the main-thread this makes four threads in the parent-process. The Non-Pool version provided below on the other hand is bi-threaded (multiprocessing.Queue
starts a feeder-thread upon first usage of .put()
) . The ~60 lines of the non-Pool solution compare to ~900 lines of multiprocessing.pool.py
alone. A good portion of the latter will run anyway, only to shuffel around None
s instead of your actual results.
Pool
is great for the frequent use case of processing function-tasks as a whole, retrieving sub-task results from generators just does not fit the bill here.
Using multiprocessing.Pool
Now if you are determined to go ahead with this approach anyway, at least don't use Manager
for it. There's hardly any reason to reach for Manager
on a single node and the only need I can think of you really need to use manager-queues for, is when you have to send a queue-reference to an already up and running process. Since Pool
's initializer()
allows you to pass arguments already at worker-process-startup, manager-queues also are not necessary.
You have to be aware, that every interaction between parent- and child-process through Manager
-proxies results in a detour through an extra manager-process, increasing latency by more IPC, context-switches and cache-flushes. It also has the potential to result in a considerably increased memory-footprint.
import time
import multiprocessing as mp
# def generate(x): ... # take from question
def init_queue(queue):
globals()['queue'] = queue
def wrapper(x):
q = queue
for item in generate(x):
q.put(item)
if __name__ == '__main__':
POISON = 'POISON'
queue = mp.SimpleQueue()
with mp.Pool(processes=4, initializer=init_queue, initargs=(queue,)) as pool:
pool.map_async(
func=wrapper,
iterable=range(0, 100, 10),
chunksize=1,
callback=lambda _: queue.put(POISON)
)
for res in iter(queue.get, POISON):
print(res)
Using multiprocessing.Process
Now the alternative I would prefer over using Pool
in this case, is building your own little specialized pool, with multiprocessing.Process
and some multiprocessing-queue. Yes, it's a bit more code to write, but the amount of code that actually runs is considerably reduced compared to any solution involving multiprocessing.Pool
. This leaves less room for subtle bugs, plus it's more open to changing conditions and uses less system resources.
import time
import multiprocessing as mp
from itertools import chain
DONE = 'DONE'
POISON = 'POISON'
def _worker(func, inqueue, outqueue):
for chunk in iter(inqueue.get, POISON):
for res in func(chunk):
outqueue.put(res)
outqueue.put(DONE)
outqueue.put(POISON)
def _init_pool(n_workers, func):
"""Initialize worker-processes and queues."""
inqueue, outqueue = mp.Queue(), mp.Queue()
pool = [
mp.Process(target=_worker, args=(func, inqueue, outqueue))
for _ in range(n_workers)
]
for p in pool:
p.start()
return pool, inqueue, outqueue
def iflatmap(n_workers, func, iterable):
"""Yield results from subprocesses unordered and immediately."""
iterable = chain(iterable, [POISON] * n_workers)
pool, inqueue, outqueue = _init_pool(n_workers, func)
for _ in pool:
inqueue.put(next(iterable))
while n_workers:
res = outqueue.get()
if res == DONE: # there's a free worker now
inqueue.put(next(iterable))
elif res == POISON: # a worker has shut down
n_workers -= 1
else:
yield res
for p in pool:
p.join()
The example here is using only four workers to show this solution (also) doesn't rely on having the same number of workers and tasks. Since it also doesn't need to know the length of the input-iterable for its control-flow, this allows for providing a generator of unknown length as iterable. If you prefer it classy, you can wrap the logic above in a "Pool"-class instead.
def generate(x):
for j in range(x, x + 10):
yield j
time.sleep(1)
if __name__ == '__main__':
for res in iflatmap(n_workers=4, func=generate, iterable=range(0, 100, 10)):
print(res)
For people unfamiliar with the usage of iter(object, sentinel)
: docs and some reasoning here