I want to implement producer-consumer pattern by using multiprocessing.pool.Pool
Since the JoinableQueue
cannot be used in Pool
(would claim RuntimeError: JoinableQueue objects should only be shared between processes through inheritance
), I have to use multiprocessing.Manager()
inspired by this answer.
The question is: now the program may hang when consumer jobs are larger than producer jobs.
import queue
import random
from multiprocessing import Manager, Pool
def consumer(q):
while True:
try:
res = q.get(block=False)
if res is None:
break
print(f'Consume {res}')
except queue.Empty:
pass
def producer(q, food):
for i in range(2):
res = f'{food} {i}'
print(f'Produce {res}')
q.put(res)
q.put(None) # sentinel
if __name__ == "__main__":
with Pool() as pool:
jobs = 2
foods = ['apple', 'banana', 'melon', 'salad']
q = Manager().Queue()
[pool.apply_async(func=consumer, args=(q, )) for _ in range(jobs + 1)] # would hang
# would not hang only when the consumer jobs is equal or less than the producer jobs
# [pool.apply_async(func=consumer, args=(q, )) for _ in range(jobs)]
[
pool.apply_async(func=producer, args=(q, random.choice(foods)))
for _ in range(jobs)
]
pool.close()
pool.join()
Seems like those extra consumers can't get the sentinel and just wait there forever.
So what's the elegant way to implement the producer-consumer pattern in multiprocessing.pool.Pool
?
Or is it only possible with multiprocessing.Process
+ JoinableQueue
?