7

I'm trying to figure out if it's possible throw a custom exception into a running asyncio task, similarly to what is achieved by Task.cancel(self) which schedules a CancelledError to be raised in the underlying coroutine.

I came across Task.get_coro().throw(exc), but calling it seems like opening a big can of worms as we may leave the task in a bad state. Especially considering all the machinery that happens when a task is throwing CancelledError into its coroutine.

Consider the following example:

import asyncio

class Reset(Exception):
    pass

async def infinite():
    while True:
        try:
            print('work')
            await asyncio.sleep(1)
            print('more work')
        except Reset:
            print('reset')
            continue
        except asyncio.CancelledError:
            print('cancel')
            break

async def main():
    infinite_task = asyncio.create_task(infinite())
    await asyncio.sleep(0)  # Allow infinite_task to enter its work loop.
    infinite_task.get_coro().throw(Reset())
    await infinite_task

asyncio.run(main())

## OUTPUT ##
# "work"
# "reset"
# "work"
# hangs forever ... bad :(

Is what I try to do even feasible? It feels as if I shouldn't be manipulating the underlying coroutine like this. Any workaround?

Machavity
  • 30,841
  • 27
  • 92
  • 100
Jaanus Varus
  • 3,508
  • 3
  • 31
  • 49

1 Answers1

3

There's no way to throw a custom exception into a running task. You shouldn't mess with .throw - it's a detail of implementation and altering it will probably break something.

If you want to pass information (about reset) into the task, do it trough an argument. Here's how it can be implemented:

import asyncio
from contextlib import suppress


async def infinite(need_reset):
    try:
        while True:
            inner_task = asyncio.create_task(inner_job())

            await asyncio.wait(
                [
                    need_reset.wait(),
                    inner_task
                ], 
                return_when=asyncio.FIRST_COMPLETED
            )

            if need_reset.is_set():
                print('reset')
                await cancel(inner_task)
                need_reset.clear()
    except asyncio.CancelledError:
        print('cancel')
        raise  # you should never suppress, see:
               # https://stackoverflow.com/a/33578893/1113207


async def inner_job():
    print('work')
    await asyncio.sleep(1)
    print('more work')


async def cancel(task):
    # more info: https://stackoverflow.com/a/43810272/1113207
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task


async def main():
    need_reset = asyncio.Event()
    infinite_task = asyncio.create_task(infinite(need_reset))

    await asyncio.sleep(1.5)
    need_reset.set()

    await asyncio.sleep(1.5)
    await cancel(infinite_task)


asyncio.run(main())

Output:

work
more work
work
reset
work
more work
work
cancel
Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • Is this (the fact that you shouldn't use `throw` on the coroutine of a task) documented somewhere on the Python docs? – Anakhand May 21 '22 at 14:58
  • 1
    @Anakhand I don't know, tbh. But I'm pretty sure about it. A long time ago, I called [`set_exception`](https://docs.python.org/3/library/asyncio-future.html#asyncio.Future.set_exception) on a task in one of my SO answers, thinking it should be possible since the `Task` is a subclass of a `Future`. Someone posted my snippet on a Google Questions group about asyncio asking if it's ok. Guido (the Python creator) himself came and said, no, no one should use `set_exception` on a `Task` despite it's inherited from a `Future` :) But, yea, I don't think these things are documented anywhere. – Mikhail Gerasimov May 21 '22 at 19:47