I'm writing a program in Python that runs forever and randomly receives requests that have to be processed in parallel. Each request can take dozens of minutes to process and puts some burden on the CPU, so asyncio is not an option. For each request I start a new worker process.
The problem is, that if I don't call join()
on a worker after it's finished,
it turns into a zombie process.
My current solution is to regularly iterate over all worker processes and call
join()
on them if they are finished. Is there a more elegant way than using a
timeout on multiprocessing.Queue.get()
? Maybe an event driven approach? Or is using a timeout totally fine in this case?
Please see the following code for my
current solution.
#!/usr/bin/env python3
import multiprocessing as mp
import queue
import random
import time
from typing import List
def main():
q = mp.Queue()
p_produce = mp.Process(target=produce, args=(q,))
p_receive = mp.Process(target=receive, args=(q,))
p_produce.start()
p_receive.start()
p_receive.join()
p_produce.join()
def produce(q: mp.Queue):
for i in range(10):
print(f"put({i})")
q.put(str(i))
time.sleep(random.uniform(2.0, 3.0))
q.put("EOF")
def receive(q: mp.Queue):
workers = [] # type: List[mp.Process]
while True:
to_join = [w for w in workers if not w.is_alive()]
for p_worker in to_join:
print(f"Join {p_worker.name}")
p_worker.join()
workers.remove(p_worker)
try:
request = q.get(block=True, timeout=1) # Is there a better way?
except queue.Empty:
continue
if request == "EOF":
break
p_worker = mp.Process(target=worker, args=(request,), name=request)
p_worker.start()
workers.append(p_worker)
for p_worker in workers:
print(f"Join {p_worker.name}")
p_worker.join()
def worker(name: str):
print(f"Working on {name}")
time.sleep(random.uniform(2.0, 3.0))
if __name__ == "__main__":
main()