2

I'm writing a program in Python that runs forever and randomly receives requests that have to be processed in parallel. Each request can take dozens of minutes to process and puts some burden on the CPU, so asyncio is not an option. For each request I start a new worker process.

The problem is, that if I don't call join() on a worker after it's finished, it turns into a zombie process.

My current solution is to regularly iterate over all worker processes and call join() on them if they are finished. Is there a more elegant way than using a timeout on multiprocessing.Queue.get()? Maybe an event driven approach? Or is using a timeout totally fine in this case? Please see the following code for my current solution.

#!/usr/bin/env python3

import multiprocessing as mp
import queue
import random
import time
from typing import List


def main():
    q = mp.Queue()
    p_produce = mp.Process(target=produce, args=(q,))
    p_receive = mp.Process(target=receive, args=(q,))

    p_produce.start()
    p_receive.start()

    p_receive.join()
    p_produce.join()


def produce(q: mp.Queue):
    for i in range(10):
        print(f"put({i})")
        q.put(str(i))
        time.sleep(random.uniform(2.0, 3.0))
    q.put("EOF")


def receive(q: mp.Queue):
    workers = []  # type: List[mp.Process]
    while True:
        to_join = [w for w in workers if not w.is_alive()]
        for p_worker in to_join:
            print(f"Join {p_worker.name}")
            p_worker.join()
            workers.remove(p_worker)

        try:
            request = q.get(block=True, timeout=1)  # Is there a better way?
        except queue.Empty:
            continue

        if request == "EOF":
            break

        p_worker = mp.Process(target=worker, args=(request,), name=request)
        p_worker.start()
        workers.append(p_worker)

    for p_worker in workers:
        print(f"Join {p_worker.name}")
        p_worker.join()


def worker(name: str):
    print(f"Working on {name}")
    time.sleep(random.uniform(2.0, 3.0))


if __name__ == "__main__":
    main()
Marc P.
  • 672
  • 6
  • 11

2 Answers2

4

As @Giannis suggested in a comment, you're reinventing a process manager from scratch. Sticking to what comes with Python, do you have some objection to using multiprocessing.Pool? If so, what?

The usual way to do this is to pick a maximum number of worker processes you want to run simultaneously. Say,

NUM_WORKERS = 4

Then drop this in as a replacement for your receive() function:

def receive(q: mp.Queue):
    pool = mp.Pool(NUM_WORKERS)
    while True:
        request = q.get()
        if request == "EOF":
            break
        pool.apply_async(worker, args=(request,))
    pool.close()
    pool.join()

The NUM_WORKERS processes are created once, and reused across tasks. If for some reason you need (or want) a brand new process for each task, you only need to add maxtasksperchild=1 to the Pool constructor.

And if for some reason you need to know when each task finishes, you could, e.g., add a callback= argument to the apply_async() call and write a little function that will be called when the task ends (and it will receive, as argument, whatever your worker() function returns).

The devil's in the daemons

So it turns out your worker processes in your real app want to (for whatever reasons) create processes of their own, and processes created by Pool can't do that. They're created as "daemon" processes. From the docs:

When a process exits, it attempts to terminate all of its daemonic child processes.

Note that a daemonic process is not allowed to create child processes. Otherwise a daemonic process would leave its children orphaned if it gets terminated when its parent process exits.

Pretty much clear as mud ;-) Here's an elaborate way to roll your own Pool workalike that creates non-daemon processes, but too elaborate for my tastes:

Python Process Pool non-daemonic?

Going back to your original design, which you already know works, I'd just change it to separate the logic of periodically joining worker processes from the logic of manipulating the queue. Logically, they really have nothing to do with each other. Specifically, creating a "background thread" to join makes good sense to me:

def reap(workers, quit):
    from time import sleep
    while not quit.is_set():
        to_join = [w for w in workers if not w.is_alive()]
        for p_worker in to_join:
            print(f"Join {p_worker.name}")
            p_worker.join()
            workers.remove(p_worker)
        sleep(2)  # whatever you like
    for p_worker in workers:
        print(f"Join {p_worker.name}")
        p_worker.join()

def receive(q: mp.Queue):
    import threading
    workers = []  # type: List[mp.Process]
    quit = threading.Event()
    reaper = threading.Thread(target=reap, args=(workers, quit))
    reaper.start()
 
    while True:
        request = q.get()
        if request == "EOF":
            break
        p_worker = mp.Process(target=worker, args=(request,), name=request)
        p_worker.start()
        workers.append(p_worker)

    quit.set()
    reaper.join()

I happen to know that list.append() and list.remove() are thread-safe in CPython, so there's no need to protect those operations with a lock. But it wouldn't hurt if you added one.

And one more to try

While processes created by Pool are daemonic, it seems that processes created by the similar concurrent.futures.ProcessPoolExecutor are not. So this simple variation of my first suggestion may work for you (or may not ;-) ):

NUM_WORKERS = 4

def receive(q: mp.Queue):
    import concurrent.futures as cf
    with cf.ProcessPoolExecutor(NUM_WORKERS) as e:
        while True:
            request = q.get()
            if request == "EOF":
                break
            e.submit(worker, request)

If that does work for you, it's hard to imagine anything materially simpler.

Community
  • 1
  • 1
Tim Peters
  • 67,464
  • 13
  • 126
  • 132
  • Thanks for the hint, Tim, I like your solution. I managed to run it with a toy example. Unfortunately some library I'm using doesn't play along with `mp.Pool`. I can see in my log files that the worker process starts the target function and it runs a bit but then always halts at the same statement without any error message. Strangely with my polling approach it works :( – Marc P. Mar 31 '17 at 14:16
  • I plugged in a callback for `error_callback` in `apply_async` and get the following error message: "daemonic processes are not allowed to have children". – Marc P. Mar 31 '17 at 14:45
  • Ugh. Too messy for a comment - edited the answer instead with a new section at the end. – Tim Peters Mar 31 '17 at 17:34
  • @TimPeters, your last try with `concurrent.futures.ProcessPoolExecutor` did the trick :D I hope the guys who develop the Python language will not make the workers created by `ProcessPoolExecutor` daemonic in the future. After all, daemons are evil, right? ;) – Marc P. Mar 31 '17 at 23:29
2

Well, one solution would be using a workqueue like python rq or selery. Essentially you would spawn n workers (seperate processes) which would look at a queue for tasks to execute and then on your main program you would just push tasks in the queue and check for the results periodically.

Giannis Spiliopoulos
  • 2,628
  • 18
  • 27
  • Thanks for pointing me to Celery and the like. Probably I'll use it in the future if I have to scale the application. – Marc P. Mar 31 '17 at 14:17