I have an asyncio event loop to which I add a signal handler, using loop.add_signal_handler()
. The signals I want to catch are SIGINT, SIGHUP and SIGTERM to gracefully shutdown my event loop.
From this event loop I want to fork processes, using multiprocessing.Process()
. This process p
I want to be able to terminate from the event loop using p.terminate()
. However, the signal handler will catch the SIGTERM signal issued by p.terminate()
, prompting the execution of my shutdown code, leaving p
running.
I have not found any solutions to this. Most posts say you should refrain from using termination signals and seek to use multiprocessing.Queue()
by passing e.g. None
and handle this in the child-process. While I see the usefulness and cleanness of this approach, in my case using a multiprocessing.Queue()
will not be feasible. Am I trying something impossible or have I missed something?
I have created a minimum example:
import asyncio
import multiprocessing as mp
import signal
import time
class Test():
def __init__(self):
self.event_loop = asyncio.get_event_loop()
def create_mp(self):
# Wrapper for creating process
self.p = mp.Process(target=worker)
self.p.start()
async def shutdown_mp(self):
# Shutdown after 5 sec
await asyncio.sleep(5)
self.p.terminate()
async def async_print(self):
# Simple coroutine printing
while True:
await asyncio.sleep(1)
print("Async_test")
async def shutdown_event_loop(self, signal):
# Graceful shutdown, inspiration from roguelynn (Lynn Root)
print("Received exit signal {}".format(signal.name))
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
# Cancel all tasks
[task.cancel() for task in tasks]
print("Cancelling {} outstanding tasks".format(len(tasks)))
await asyncio.gather(*tasks, return_exceptions=True)
self.event_loop.stop()
def run(self):
# Add signals
signals = (signal.SIGINT, signal.SIGHUP, signal.SIGTERM)
for s in signals:
self.event_loop.add_signal_handler(
s, lambda s=s: self.event_loop.create_task(self.shutdown_event_loop(s)))
# Schedule async task
self.event_loop.create_task(self.async_print())
# Start processes
self.create_mp()
# Schedule process to be terminated
self.event_loop.create_task(self.shutdown_mp())
self.event_loop.run_forever()
def worker():
# Simple process
while True:
print("Test")
time.sleep(1)
if __name__ == "__main__":
test = Test()
test.run()
I have not been able to simply KeyboardInterrupt
cancel the processes. Instead I use pgrep python3
and kill -9 PID
.