9

I am writing code to run experiments in parallel. I don't have control over what the experiments do, they might open use subprocess.Popen or check_output to run one or multiple additional child processes.

I have two conditions: I want to be able to kill experiments that exceed a time out and I want to kill experiments upon KeyboardInterrupt.

Most ways to terminate processes don't make sure that all subprocesses etc are killed. This is obviously a problem if 100s of experiments are run one after the other but they all spawn child processes that stay alive after the timeout occurred and the experiment was supposedly killed.

The way I am dealing with this now it to include code to store experiment configurations in a database, generating code that loads and runs experiments from command line and then calling these commands via subprocess.Popen(cmd, shell=True, start_new_session=True) and killing them using os.killpg on timeout.

My main question then is: Calling these experiments via command line feels cumbersome, so is there a way to call code directly via multiprocessing.Process(target=fn) and achieving the same effect of start_new_session=True + os.killpg upon timeout and KeyboardInterrupt?

<file1>
def run_exp(config):
    do work
    return result

if __name__ == "__main__":
    save_exp(run_exp(load_config(sys.args)))

<file2>
def monitor(queue):
    active = set()  # active process ids
    while True:
        msg = queue.get()
        if msg == "sentinel":
             <loop over active ids and kill them with os.killpg>
        else:
            <add or remove id from active set>


def worker(args):
    id, queue = args
    command = f"python <file1> {id}"
    with subprocess.Popen(command, shell=True, ..., start_new_session=True) as process:
        try:
            queue.put(f"start {process.pid}")
            process.communicate(timeout=timeout)
        except TimeoutExpired:
            os.killpg(process.pid, signal.SIGINT)  # send signal to the process group
            process.communicate()
        finally:
            queue.put(f"done {process.pid}")

def main():
    <save configs => c_ids>
    queue = manager.Queue()
    process = Process(target=monitor, args=(queue,))
    process.start()

    def clean_exit():
        queue.put("sentinel")
        <terminate pool and monitor process>

    r = pool.map_async(worker, [(c_id, queue) for c_id in c_ids])
    atexit.register(clean_exit)
    r.wait()
    <terminate pool and monitor process>

I posted a skeleton of the code that details the approach of starting processes via command line and killing them. An additional complication of that version of my approach is that when the KeyboardInterrupt arrives, the queue already gets terminated (for a lack of a better word) and communicating with the monitor process is impossible (the sentinel message never arrives). Instead I have to resort to writing process ids to file and reading the file back to in the main process to kill the still running processes. If you know a way to work around this queue-issue I'd be eager to learn about it.

Samuel
  • 18,286
  • 18
  • 52
  • 88
  • 1
    This seems to be related to the timeout part of your question. Does it help? https://stackoverflow.com/questions/29494001/how-can-i-abort-a-task-in-a-multiprocessing-pool-after-a-timeout – VPfB May 23 '19 at 14:06
  • 1
    On Unix, this will create a process group for each worker `pool = mp.Pool(4, initializer=os.setpgrp)`. – VPfB May 23 '19 at 14:08
  • That’s interesting! – Samuel May 28 '19 at 06:39

2 Answers2

5

I think the problem is you are storing Subprocess pid to kill it you need host process pid, and you used signal.SIGINT which I think should be signal.SIGTERM. try this, instead of this line:

os.killpg(process.pid, signal.SIGINT)

use this line:

os.killpg(os.getpgid(process.pid), signal.SIGTERM) 
Siyavash vaez afshar
  • 1,303
  • 10
  • 12
4

I guess there is one way to avoid this is using Try catch block.
Say if the KeyboardInterrupt arrives in main() then you could try this:

def main():
    try:
        <save configs => c_ids>
        queue = manager.Queue()
        process = Process(target=monitor, args=(queue,))
        process.start()

        def clean_exit():
            queue.put("sentinel")
            <terminate pool and monitor process>

        r = pool.map_async(worker, [(c_id, queue) for c_id in c_ids])
        atexit.register(clean_exit)
        r.wait()
        <terminate pool and monitor process>
    except KeyboardInterrupt as e:
        pass
        #write the process you want to keep continuing. 

Guess this will be helpful.

  • You mean to avoid closing the queue? – Samuel May 28 '19 at 06:40
  • @Samuel Yes. I guess it will avoid closing of the queue. As you can see I have mentioned `pass`. It won't let your program affected by the key strokes say `Ctrl+C` most of the time and your program will continue. Try it out. – Amazing Things Around You May 28 '19 at 08:24
  • @Samuel You can exit the complete program if the key interruption arrives. Say using `sys.exit()` the program will get terminated. Also it would be better if you try Multi Threading. It will kill all the processes that's were initiated within your program as soon as the key interrupt occurs. Might be a better solution. – Amazing Things Around You May 28 '19 at 09:10