26

I have some code that needs to run against several other systems that may hang or have problems not under my control. I would like to use python's multiprocessing to spawn child processes to run independent of the main program and then when they hang or have problems terminate them, but I am not sure of the best way to go about this.

When terminate is called it does kill the child process, but then it becomes a defunct zombie that is not released until the process object is gone. The example code below where the loop never ends works to kill it and allow a respawn when called again, but does not seem like a good way of going about this (ie multiprocessing.Process() would be better in the __init__()).

Anyone have a suggestion?

class Process(object):
    def __init__(self):
        self.thing = Thing()
        self.running_flag = multiprocessing.Value("i", 1)

    def run(self):
        self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))
        self.process.start()
        print self.process.pid

    def pause_resume(self):
        self.running_flag.value = not self.running_flag.value

    def terminate(self):
        self.process.terminate()

class Thing(object):
    def __init__(self):
        self.count = 1

    def worker(self,running_flag):
        while True:
            if running_flag.value:
                self.do_work()

    def do_work(self):
        print "working {0} ...".format(self.count)
        self.count += 1
        time.sleep(1)
noxdafox
  • 14,439
  • 4
  • 33
  • 45
Dan Littlejohn
  • 1,329
  • 4
  • 16
  • 30
  • I think the direct answer is that you don't want to be terminating your processes - as you have found they become zombies that could be holding onto memory or who knows what. In my limited experience with multiprocessing, I was just more vigilant about making workers that could die gracefully. Does [this answer](http://stackoverflow.com/a/19929767/377366) on basically the same topic answer your quesetion? – KobeJohn Aug 17 '15 at 15:37
  • Not quite. I have read a number of threads like that and they are expect an exception to occur and then you can handle it. I have the case where something is running, but hangs. An operator may recognize that something is not running correctly and I would like a way to stop the process. Since it is hung it would not be responding to something like sending a stop flag through multiprocessing.Value(). Likely, what they would do is kill the whole program and then restart it so I was wondering if there is a good way to do that in the program for just the sub process to recover. – Dan Littlejohn Aug 17 '15 at 16:10
  • I think I know what you mean but still the only answer I have found is as above - you just have to do whatever you can to avoid "hanging" (e.g. a catch-all exception wrapping each process). As further evidence, threading is arguably the more simple solution in many cases and [it has the same advice](http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python). On that point, just in case you haven't checked - make sure you really need multiprocessing instead of threading as each has its own benefits and drawbacks. – KobeJohn Aug 17 '15 at 20:18

2 Answers2

19

You might run the child processes as daemons in the background.

process.daemon = True

Any errors and hangs (or an infinite loop) in a daemon process will not affect the main process, and it will only be terminated once the main process exits.

This will work for simple problems until you run into a lot of child daemon processes which will keep reaping memories from the parent process without any explicit control.

Best way is to set up a Queue to have all the child processes communicate to the parent process so that we can join them and clean up nicely. Here is some simple code that will check if a child processing is hanging (aka time.sleep(1000)), and send a message to the queue for the main process to take action on it:

import multiprocessing as mp
import time
import queue

running_flag = mp.Value("i", 1)

def worker(running_flag, q):
    count = 1
    while True:
        if running_flag.value:
            print(f"working {count} ...")
            count += 1
            q.put(count)
            time.sleep(1)
            if count > 3:
                # Simulate hanging with sleep
                print("hanging...")
                time.sleep(1000)

def watchdog(q):
    """
    This check the queue for updates and send a signal to it
    when the child process isn't sending anything for too long
    """
    while True:
        try:
            msg = q.get(timeout=10.0)
        except queue.Empty as e:
            print("[WATCHDOG]: Maybe WORKER is slacking")
            q.put("KILL WORKER")

def main():
    """The main process"""
    q = mp.Queue()

    workr = mp.Process(target=worker, args=(running_flag, q))
    wdog = mp.Process(target=watchdog, args=(q,))

    # run the watchdog as daemon so it terminates with the main process
    wdog.daemon = True

    workr.start()
    print("[MAIN]: starting process P1")
    wdog.start()

    # Poll the queue
    while True:
        msg = q.get()
        if msg == "KILL WORKER":
            print("[MAIN]: Terminating slacking WORKER")
            workr.terminate()
            time.sleep(0.1)
            if not workr.is_alive():
                print("[MAIN]: WORKER is a goner")
                workr.join(timeout=1.0)
                print("[MAIN]: Joined WORKER successfully!")
                q.close()
                break # watchdog process daemon gets terminated

if __name__ == '__main__':
    main()

Without terminating worker, attempt to join() it to the main process would have blocked forever since worker has never finished.

Jean-Francois T.
  • 11,549
  • 7
  • 68
  • 107
Pandemonium
  • 7,724
  • 3
  • 32
  • 51
  • 4
    I think there's a typo in your code. Shouldn't it be `if msg == "KILL WORKER":` – Fra Mar 24 '19 at 17:10
  • 1
    This code kills worker even if it is not hanging. The loop in `main` polls all the messages from the queue and the watchdog always has an empty queue. – Gregory Jan 14 '22 at 05:14
15

The way Python multiprocessing handles processes is a bit confusing.

From the multiprocessing guidelines:

Joining zombie processes

On Unix when a process finishes but has not been joined it becomes a zombie. There should never be very many because each time a new process starts (or active_children() is called) all completed processes which have not yet been joined will be joined. Also calling a finished process’s Process.is_alive will join the process. Even so it is probably good practice to explicitly join all the processes that you start.

In order to avoid a process to become a zombie, you need to call it's join() method once you kill it.

If you want a simpler way to deal with the hanging calls in your system you can take a look at pebble.

Community
  • 1
  • 1
noxdafox
  • 14,439
  • 4
  • 33
  • 45
  • 2
    so is it still neccessary to call `process.terminate()` to make sure the process is terminated or `join()` only should be enough to kill them? – weefwefwqg3 Nov 02 '17 at 16:08
  • 5
    `terminate` issues a termination request to the target process via a `SIGTERM` signal. It's up to the process to honour the request (in most cases they do). It has no effect if the process has already ended. `join` simply instructs the OS to reclaim the process resources if it has ended, otherwise it will block up until then. `join` must be always called on ended processes otherwise the OS will pile up resources of exhausted processes. They are usually referred as "zombie" processes because they are literally empty shells of once running ones. – noxdafox Nov 02 '17 at 17:14