If we have 2 asyncio
coroutines, is it possible to use Python multiproessing
to let each of them run in its own process, and allow the coroutines in both processes to be stopped (by calling their stop
method) when the user hits Ctrl+C?
This will be similar to the code below, except that foo.start()
and bar.start()
coroutines should have their own process.
from builtins import KeyboardInterrupt
import asyncio
import multiprocessing
import signal
class App:
def __init__(self, text):
self.text = text
async def start(self):
self.loop_task = asyncio.create_task(self.hello())
await asyncio.wait([self.loop_task])
async def stop(self):
self.loop_task.cancel()
async def hello(self):
while True:
print(self.text)
await asyncio.sleep(2)
if __name__ == '__main__':
foo = App('foo')
bar = App('bar')
# Running in a single process works fine
try:
asyncio.run(asyncio.wait([foo.start(), bar.start()]))
except KeyboardInterrupt:
asyncio.run(asyncio.wait([foo.stop(), bar.stop()]))
Tried using multiprocessing
and signals
, but I am also not sure how to call foo.stop()
and bar.stop()
before the 2 processes terminates.
if __name__ == '__main__':
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def start_foo():
asyncio.run(foo.start())
def start_bar():
asyncio.run(bar.start())
foo = App('foo')
bar = App('bar')
pool = multiprocessing.Pool(10, init_worker)
try:
print('Starting 2 jobs')
pool.apply_async(start_foo)
pool.apply_async(start_bar)
while True:
time.sleep(1) # is sleeping like this a bad thing?
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, terminating workers')
pool.terminate()
pool.join()
print('Shut down complete')
# Based on https://stackoverflow.com/a/11312948/741099
Using Python 3.9.5 on Ubuntu 20.04
Based on @Will Da Silva's solution, I made tiny modifications to check if asyncio.run(app.stop())
gets called on pressing Ctrl+C
class App:
def __init__(self, text):
self.text = text
async def start(self):
self.loop_task = asyncio.create_task(self.hello())
await asyncio.wait([self.loop_task])
async def stop(self):
self.loop_task.cancel()
print(f'Stopping {self.text}')
async def hello(self):
while True:
print(self.text)
await asyncio.sleep(2)
def f(app):
try:
asyncio.run(app.start())
except KeyboardInterrupt:
asyncio.run(app.stop())
if __name__ == '__main__':
jobs = (App('foo'), App('bar'))
with multiprocessing.Pool(min(len(jobs), os.cpu_count())) as pool:
try:
print(f'Starting {len(jobs)} jobs')
pool.map(f, jobs)
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, terminating workers')
print('Shut down complete')
However, it seems that if I repeat starting and stopping the Python script multiple times, print(f'Stopping {self.text}')
inside app.stop()
does not print to stdout half the time.
Output:
$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Shut down complete
$ python test.py
Starting 2 jobs
bar
foo
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Shut down complete
$ python test.py
Starting 2 jobs
foo
bar
^CCaught KeyboardInterrupt, terminating workers
Stopping bar
Stopping foo
Shut down complete