66

I have an event loop that runs some co-routines as part of a command line tool. The user may interrupt the tool with the usual Ctrl + C, at which point I want to clean up properly after the interrupted event loop.

Here's what I tried.

import asyncio


@asyncio.coroutine
def shleepy_time(seconds):
    print("Shleeping for {s} seconds...".format(s=seconds))
    yield from asyncio.sleep(seconds)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # Side note: Apparently, async() will be deprecated in 3.4.4.
    # See: https://docs.python.org/3.4/library/asyncio-task.html#asyncio.async
    tasks = [
        asyncio.async(shleepy_time(seconds=5)),
        asyncio.async(shleepy_time(seconds=10))
    ]

    try:
        loop.run_until_complete(asyncio.gather(*tasks))
    except KeyboardInterrupt as e:
        print("Caught keyboard interrupt. Canceling tasks...")

        # This doesn't seem to be the correct solution.
        for t in tasks:
            t.cancel()
    finally:
        loop.close()

Running this and hitting Ctrl + C yields:

$ python3 asyncio-keyboardinterrupt-example.py 
Shleeping for 5 seconds...
Shleeping for 10 seconds...
^CCaught keyboard interrupt. Canceling tasks...
Task was destroyed but it is pending!
task: <Task pending coro=<shleepy_time() running at asyncio-keyboardinterrupt-example.py:7> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(1)() at /usr/local/Cellar/python3/3.4.3/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py:587]>
Task was destroyed but it is pending!
task: <Task pending coro=<shleepy_time() running at asyncio-keyboardinterrupt-example.py:7> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(0)() at /usr/local/Cellar/python3/3.4.3/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py:587]>

Clearly, I didn't clean up correctly. I thought perhaps calling cancel() on the tasks would be the way to do it.

What's the correct way to clean up after an interrupted event loop?

vaultah
  • 44,105
  • 12
  • 114
  • 143
Nick Chammas
  • 11,843
  • 8
  • 56
  • 115

5 Answers5

59

When you CTRL+C, the event loop gets stopped, so your calls to t.cancel() don't actually take effect. For the tasks to be cancelled, you need to start the loop back up again.

Here's how you can handle it:

import asyncio

@asyncio.coroutine
def shleepy_time(seconds):
    print("Shleeping for {s} seconds...".format(s=seconds))
    yield from asyncio.sleep(seconds)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # Side note: Apparently, async() will be deprecated in 3.4.4.
    # See: https://docs.python.org/3.4/library/asyncio-task.html#asyncio.async
    tasks = asyncio.gather(
        asyncio.async(shleepy_time(seconds=5)),
        asyncio.async(shleepy_time(seconds=10))
    )

    try:
        loop.run_until_complete(tasks)
    except KeyboardInterrupt as e:
        print("Caught keyboard interrupt. Canceling tasks...")
        tasks.cancel()
        loop.run_forever()
        tasks.exception()
    finally:
        loop.close()

Once we catch KeyboardInterrupt, we call tasks.cancel() and then start the loop up again. run_forever will actually exit as soon as tasks gets cancelled (note that cancelling the Future returned by asyncio.gather also cancels all the Futures inside of it), because the interrupted loop.run_until_complete call added a done_callback to tasks that stops the loop. So, when we cancel tasks, that callback fires, and the loop stops. At that point we call tasks.exception, just to avoid getting a warning about not fetching the exception from the _GatheringFuture.

dano
  • 91,354
  • 19
  • 222
  • 219
  • Ah, so nothing happens to a task outside of the event loop, not even a cancellation, right? That sounds like a simple rule to keep in mind. – Nick Chammas Jun 10 '15 at 20:59
  • Just tried this out and it seems to work as advertised. Sweet! Side note: I also noticed that if you pass `return_exceptions=True` to `gather()`, you can leave out the call to `tasks.exception()` since the exceptions are being returned as results. – Nick Chammas Jun 10 '15 at 21:00
  • @NickChammas Right, the loop must be running for the cancellation to take effect [(as the noted in the docs)](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel). And yes, in general nothing is going to happen with an `asyncio.Task` unless the loop is actively driving it. Using `return_exceptions=True` is a good trick, as long as you're ok with real exceptions (e.g. something other than `CancelledError`) being thrown from your wrapped coroutines not actually getting raised. – dano Jun 10 '15 at 21:23
  • 2
    dano - Does the behavior you described here apply the same when a coroutine raises a regular exception (as opposed to the user raising a keyboard interrupt)? I'm finding that [the call to `loop.run_forever()` just keeps going and the canceled tasks just run anyway](https://gist.github.com/nchammas/c1486678a0b36f38f22e). Is this expected? – Nick Chammas Jun 15 '15 at 23:58
  • @NickChammas In that example code, the call to `run_in_complete(tasks)` isn't being interrupted, it's actually completing and the `tasks` Future is being marked as done. Because of that, your call to `tasks.cancel` is actually a no-op. That also means your call to `run_forever()` will actually run forever, since there's no pending `run_until_complete` callback to stop it. And since the other tasks weren't cancelled by the `tasks.cancel()` call, they'll just keep right on running when the loop starts up again. – dano Jun 16 '15 at 00:46
  • 1
    @NickChammas Actually, you could probably make a pretty good case that the `_GatheringFuture` returned by `asyncio.gather` should be enhanced to support calling `cancel()` on its children, even if the `_GatheringFuture` itself is already done, to support this use-case. Though I suppose if you want this behavior you may be better off using `asyncio.wait` with the `FIRST_EXCEPTION` option. – dano Jun 16 '15 at 00:53
  • Ah, so the 1 coroutine raising an exception marks the "gathered" future as complete. I understand. Thank you for explaining all this to me. – Nick Chammas Jun 16 '15 at 17:50
  • @NickChammas Yep, that's exactly what's happening. Happy to help :) – dano Jun 17 '15 at 15:40
26

Note for Python 3.7+: The below is now implemented as part of the standard library asyncio.run function – Replace the below with sys.exit(loop.run(amain(loop))) once you are ready to upgrade! (If you want to print the message, simply move that try…except-clause into amain.)

Updated for Python 3.6+: Add call to loop.shutdown_asyncgens to avoid memory leaks by asynchronous generators that weren't fully used.

The following solution, inspired by some of the other answers, should work in almost all cases and does not depend on you manually keeping track of tasks that need to be cleaned up on Ctrl+C:

loop = asyncio.get_event_loop()
try:
    # Here `amain(loop)` is the core coroutine that may spawn any
    # number of tasks
    sys.exit(loop.run_until_complete(amain(loop)))
except KeyboardInterrupt:
    # Optionally show a message if the shutdown may take a while
    print("Attempting graceful shutdown, press Ctrl+C again to exit…", flush=True)
    
    # Do not show `asyncio.CancelledError` exceptions during shutdown
    # (a lot of these may be generated, skip this if you prefer to see them)
    def shutdown_exception_handler(loop, context):
        if "exception" not in context \
        or not isinstance(context["exception"], asyncio.CancelledError):
            loop.default_exception_handler(context)
    loop.set_exception_handler(shutdown_exception_handler)
    
    # Handle shutdown gracefully by waiting for all tasks to be cancelled
    tasks = asyncio.gather(*asyncio.Task.all_tasks(loop=loop), loop=loop, return_exceptions=True)
    tasks.add_done_callback(lambda t: loop.stop())
    tasks.cancel()
    
    # Keep the event loop running until it is either destroyed or all
    # tasks have really terminated
    while not tasks.done() and not loop.is_closed():
        loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

The above code will obtain all currently tasks from the event loop using asyncio.Task.all_tasks and place them in a single combined future using asyncio.gather. All tasks in that future (which are all currently running tasks) are then canceled using the future's .cancel() method. The return_exceptions=True then ensures that all the received asyncio.CancelledError exceptions are stored instead of causing the future to become errored.

The above code will also override the default exception handler to prevent the generated asyncio.CancelledError exceptions from being logged.

Update from 2020-12-17: Dropped compatiblity code for Python 3.5.

ntninja
  • 1,204
  • 16
  • 20
  • 1
    Apparently the core team deprecated using a separate `loop` which `is Not asyncio.get_event_loop()` (and consequently all those `loop=loop` kw params) – nodakai Mar 14 '19 at 11:41
21

In Python 3.7+ it's recommended that you use asyncio.run to start an async main function.

asyncio.run will take care of creating the event loop for your program and ensure that the event loop is closed and all tasks are cleaned when the main function exits (including due to a KeyboardInterrupt exception).

It is roughly analogous to the following (see asyncio/runners.py):

def run(coro, *, debug=False):
    """`asyncio.run` is new in Python 3.7"""
    loop = asyncio.get_event_loop()
    try:
        loop.set_debug(debug)
        return loop.run_until_complete(coro)
    finally:
        try:
            all_tasks = asyncio.gather(*asyncio.all_tasks(loop), return_exceptions=True)
            all_tasks.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                loop.run_until_complete(all_tasks)
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            loop.close()
dcoles
  • 3,785
  • 2
  • 28
  • 26
  • If you `KeyboardInterrupt` `asyncio.run(main())` and `main` handles `asyncio.CancelledError` how do you access its return value? – Kenny May 15 '20 at 02:33
  • 1
    @Kenny I'm not sure I completely understood your question; but you can catch a asyncio.CancelledError like any other exception: `try: asyncio.run(main()); except asyncio.CancelledError as e: print(f'Cancelled: {e}')`. – dcoles Oct 05 '20 at 17:52
5

Unless you are on Windows, set up event-loop based signal handlers for SIGINT (and also SIGTERM so you can run it as a service). In these handlers, you may either exit the event loop immediately, or initiate some kind of cleanup sequence and exit later.

Example in official Python documentation: https://docs.python.org/3.4/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm

Ambroz Bizjak
  • 7,809
  • 1
  • 38
  • 49
  • Can you clarify how this works and why it's preferable to simply capturing `KeyboardInterrupt` as I did in my example? The interrupt seems to work fine. Cleaning up the tasks left behind seems to be the problem. Is it because the event loop is not handling the interrupt itself? – Nick Chammas Jun 10 '15 at 19:43
  • Generally when you have an event loop you should handle all kinds of events within the event loop. I cannot say why KeyboardInterrupt would have problems specifically. Consider though that it may interrupt essentially any code executing within the event loop (but I cannot say that for sure since I do not know the details of the design). – Ambroz Bizjak Jun 10 '15 at 20:31
  • How it works does not seem described in the docs. I suppose it's the "self-pipe" trick, you should look into the Python source if you want to know. – Ambroz Bizjak Jun 10 '15 at 20:34
3

using the signal module to set an asyncio.Event on a signal.SIGINT signal (Ctrl + C) can be a clean way to tell all of your asyncio code to stop naturally. This is especially important because some libraries such as aiohttp need a chance to be run cleanup tasks before the event loop closes.

Here is an example that uses the aiohttp library. There is a asyncio.sleep(5) to prevent the connection from being returned to the pool, to give the user a chance to ctrl+c and simulate a KeyboardInterrupt exception

example code:

import logging
import asyncio
import signal
import random

import aiohttp

logging.basicConfig(level="INFO", format="%(asctime)s %(threadName)-10s %(name)-10s %(levelname)-8s: %(message)s")
logger = logging.getLogger("root")

stop_event = asyncio.Event()

async def get_json(aiohttp_session):

    logger.info("making http request")

    params = {"value": random.randint(0,1000) }
    async with aiohttp_session.get(f'https://httpbin.org/get', params=params) as response:

        # async with response:
        j = await response.json()
        logger.info("get data: `%s`", j["args"])
        await asyncio.sleep(5)

async def run():

    while not stop_event.is_set():
        async with aiohttp.ClientSession() as aiohttp_session:

            await get_json(aiohttp_session)

    logger.info("stop event was set, sleeping to let aiohttp close it's connections")
    await asyncio.sleep(0.1)
    logger.info("sleep finished, returning")


def inner_ctrl_c_signal_handler(sig, frame):
    '''
    function that gets called when the user issues a
    keyboard interrupt (ctrl+c)
    '''

    logger.info("SIGINT caught!")
    stop_event.set()

# experiment with commenting out this line and ctrl+c-ing the script
# to see how you get an "event loop is closed" error
signal.signal(signal.SIGINT, inner_ctrl_c_signal_handler)

asyncio.run(run())

without the signal.signal call:

> python C:\Users\mark\Temp\test_aiohttp.py
2021-03-06 22:21:08,684 MainThread root       INFO    : making http request
2021-03-06 22:21:09,132 MainThread root       INFO    : get data: `{'value': '500'}`
Traceback (most recent call last):
  File "C:\Users\auror\Temp\test_aiohttp.py", line 52, in <module>
    asyncio.run(run())
  File "c:\python39\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "c:\python39\lib\asyncio\base_events.py", line 629, in run_until_complete
    self.run_forever()
  File "c:\python39\lib\asyncio\windows_events.py", line 316, in run_forever
    super().run_forever()
  File "c:\python39\lib\asyncio\base_events.py", line 596, in run_forever
    self._run_once()
  File "c:\python39\lib\asyncio\base_events.py", line 1854, in _run_once
    event_list = self._selector.select(timeout)
  File "c:\python39\lib\asyncio\windows_events.py", line 434, in select
    self._poll(timeout)
  File "c:\python39\lib\asyncio\windows_events.py", line 783, in _poll
    status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
KeyboardInterrupt
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x000001CFFD75BB80>
Traceback (most recent call last):
  File "c:\python39\lib\asyncio\proactor_events.py", line 116, in __del__
    self.close()
  File "c:\python39\lib\asyncio\proactor_events.py", line 108, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "c:\python39\lib\asyncio\base_events.py", line 746, in call_soon
    self._check_closed()
  File "c:\python39\lib\asyncio\base_events.py", line 510, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

with it:


> python C:\Users\mark\Temp\test_aiohttp.py
2021-03-06 22:20:29,656 MainThread root       INFO    : making http request
2021-03-06 22:20:30,106 MainThread root       INFO    : get data: `{'value': '367'}`
2021-03-06 22:20:35,122 MainThread root       INFO    : making http request
2021-03-06 22:20:35,863 MainThread root       INFO    : get data: `{'value': '489'}`
2021-03-06 22:20:38,695 MainThread root       INFO    : SIGINT caught!
2021-03-06 22:20:40,867 MainThread root       INFO    : stop event was set, sleeping to let aiohttp close it's connections
2021-03-06 22:20:40,962 MainThread root       INFO    : sleep finished, returning

mgrandi
  • 3,389
  • 1
  • 18
  • 17