2

I have an asyncio event loop to which I add a signal handler, using loop.add_signal_handler(). The signals I want to catch are SIGINT, SIGHUP and SIGTERM to gracefully shutdown my event loop.

From this event loop I want to fork processes, using multiprocessing.Process(). This process p I want to be able to terminate from the event loop using p.terminate(). However, the signal handler will catch the SIGTERM signal issued by p.terminate(), prompting the execution of my shutdown code, leaving p running.

I have not found any solutions to this. Most posts say you should refrain from using termination signals and seek to use multiprocessing.Queue() by passing e.g. None and handle this in the child-process. While I see the usefulness and cleanness of this approach, in my case using a multiprocessing.Queue() will not be feasible. Am I trying something impossible or have I missed something?

I have created a minimum example:

import asyncio
import multiprocessing as mp
import signal
import time


class Test():

    def __init__(self):
        self.event_loop = asyncio.get_event_loop()

    def create_mp(self):
        # Wrapper for creating process
        self.p = mp.Process(target=worker)
        self.p.start()
    
    async def shutdown_mp(self):
        # Shutdown after 5 sec
        await asyncio.sleep(5)
        self.p.terminate()

    async def async_print(self):
        # Simple coroutine printing
        while True:
            await asyncio.sleep(1)
            print("Async_test")

    async def shutdown_event_loop(self, signal):
        # Graceful shutdown, inspiration from roguelynn (Lynn Root) 
        print("Received exit signal {}".format(signal.name))
        tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
        # Cancel all tasks
        [task.cancel() for task in tasks]
        print("Cancelling {} outstanding tasks".format(len(tasks)))
        await asyncio.gather(*tasks, return_exceptions=True)
        self.event_loop.stop()

    def run(self):
        # Add signals
        signals = (signal.SIGINT, signal.SIGHUP, signal.SIGTERM)
        for s in signals:
            self.event_loop.add_signal_handler(
                s, lambda s=s: self.event_loop.create_task(self.shutdown_event_loop(s)))
        # Schedule async task
        self.event_loop.create_task(self.async_print())
        # Start processes
        self.create_mp()
        # Schedule process to be terminated
        self.event_loop.create_task(self.shutdown_mp())
        self.event_loop.run_forever()

def worker():
    # Simple process
    while True:
        print("Test")
        time.sleep(1)

if __name__ == "__main__":
    test = Test()
    test.run()

I have not been able to simply KeyboardInterrupt cancel the processes. Instead I use pgrep python3 and kill -9 PID.

Jonathan Hall
  • 75,165
  • 16
  • 143
  • 189
  • 2
    This doesn't work because the internal pipe that the event loop uses to communicate with its signal handler ends up shared between the parent and the child, causing the parent to be notified of a signal that actually happened in the child. `mp.Process` forks by default, and it's not supported to reuse an event loop after `fork()`. See [this answer](https://stackoverflow.com/a/56410123/1600898), perhaps it helps. – user4815162342 Mar 01 '21 at 16:52

0 Answers0