0

I have two tasks. When one task raises an error, I wish to restart them both. Is the following the appropriate way to catch an exception propagated by one task, and restart the gather for the two tasks?

import asyncio

async def foo():
    while True:
        await asyncio.sleep(1)
        print("foo")

async def bar():
    for _ in range(3):
        await asyncio.sleep(1)
        print("bar")
    raise ValueError


async def main():
    while True:
        footask = asyncio.create_task(foo())
        bartask = asyncio.create_task(bar())    
        bothtasks = asyncio.gather(footask, bartask)
        try:
            await bothtasks 
        except ValueError:
            print("caught ValueError")  
            try:
                footask.cancel()
            except asyncio.CancelledError:
                pass


asyncio.run(main())

Basically asyncio intentionally doesn't cancel the other tasks in a gather when one task raises an error. So, since I can't think of anything better, I manually cancel the other task(s) with task.cancel() and handle the asyncio.CancelledError myself.

I'm just not convinced this is the intended use of the api, insights appreciated.

Edit:-

In the asyncio-3.7 docs it reads

If gather() is cancelled, all submitted awaitables (that have not completed yet) are also cancelled.

But the behaviour I observe when I replace footask.cancel() with bothtasks.cancel() is that for every iteration of the while loop, an additional foo is awaited, i.e. the foo appears not to be cancelled by cancelling the gather. The output looks something like this:

foo
bar
foo
bar
foo
bar
caught ValueError
foo
foo
bar
foo
foo
bar
foo
foo
bar
caught ValueError
foo
foo
foo
bar
foo
foo
foo
bar
foo
foo
foo
bar
caught ValueError
...
jsstuball
  • 4,104
  • 7
  • 33
  • 63

2 Answers2

2

The standard idiom to ensure that the tasks have processed their cancelation is to add a gather(*tasks, return_exceptions=True) following the cancellation. For example:

async def main():
    while True:
        footask = asyncio.create_task(foo())
        bartask = asyncio.create_task(bar())    
        tasks = (footask, bartask)  # or a list comprehension, etc.
        try:
            await asyncio.gather(*tasks)
        except ValueError:
            print("caught ValueError")  
            for t in tasks:
                t.cancel()
            await asyncio.gather(*tasks, return_exceptions=True)

Note that you might want to do that for all exceptions, not just ValueError, because otherwise a task completing with a non-ValueError exception will still cause other tasks to continue running.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • Great, thanks. (Suspect its a typo where keyword param `ignore_exceptions` should be `return_exceptions` in first sentence) – jsstuball Dec 26 '18 at 06:49
1

When exceptions happens footask is not cancelled because as you can read in doc:

If return_exceptions is False (default), the first raised exception is immediately propagated to the task that awaits on gather(). Other awaitables in the aws sequence won’t be cancelled and will continue to run.

So we should manually cancel footask and await it was cancelled:

async def main():
    while True:
        footask = asyncio.create_task(foo())
        bartask = asyncio.create_task(bar())    
        bothtasks = asyncio.gather(footask, bartask)
        try:
            await bothtasks 
        except ValueError:
            print("caught ValueError")

            footask.cancel()  # cancel just mark task to be cancelled
            try:
                await footask  # await actually been cancelled
            except asyncio.CancelledError:
                pass

Upd:

I wrote advanced_gather that acts like gather, but has additional kawrg cancel_on_exception to cancel every task on exception in one of them. Full code:

import asyncio


async def advanced_gather(
        *aws, 
        loop=None, 
        return_exceptions=False, 
        cancel_on_exception=False
    ):

    tasks = [
        asyncio.ensure_future(aw, loop=loop) 
        for aw 
        in aws
    ]

    try:
        return await asyncio.gather(
            *tasks, 
            loop=loop, 
            return_exceptions=return_exceptions
        )    
    except Exception:
        if cancel_on_exception:            
            for task in tasks:
                if not task.done():
                    task.cancel()

            await asyncio.gather(
                *tasks, 
                loop=loop, 
                return_exceptions=True
            )
        raise


async def foo():
    while True:
        await asyncio.sleep(1)
        print("foo")


async def bar():
    for _ in range(3):
        await asyncio.sleep(1)
        print("bar")
    raise ValueError


async def main():
    while True:
        try:
            await advanced_gather(
                foo(),
                bar(),
                cancel_on_exception=True
            )
        except ValueError:
            print("caught ValueError")


asyncio.run(main())

Different cases of what can happen:

import asyncio
from contextlib import asynccontextmanager, suppress


async def test(_id, raise_exc=False):
    if raise_exc:
        print(f'we raise RuntimeError inside {_id}')
        raise RuntimeError('!')

    try:
        await asyncio.sleep(0.2)
    except asyncio.CancelledError:
        print(f'cancelledError was raised inside {_id}')
        raise
    else:
        print(f'everything calm inside {_id}')


@asynccontextmanager
async def prepared_stuff(foo_exc=False):
    foo = asyncio.create_task(test('foo', raise_exc=foo_exc))
    bar = asyncio.create_task(test('bar'))
    gather = asyncio.gather(
        foo,
        bar
    )
    await asyncio.sleep(0)  # make sure everything started

    yield (foo, bar, gather)

    try:
        await gather
    except Exception as exc:
        print(f'gather raised {type(exc)}')
    finally:
        # make sure both tasks finished:
        await asyncio.gather(
            foo, 
            bar, 
            return_exceptions=True
        )

    print('')


# ----------------------------------------------

async def everyting_calm():
    async with prepared_stuff() as (foo, bar, gather):
        print('everyting_calm:')


async def foo_raises_exception():
    async with prepared_stuff(foo_exc=True) as (foo, bar, gather):
        print('foo_raises_exception:')


async def foo_cancelled():
    async with prepared_stuff() as (foo, bar, gather):
        print('foo_cancelled:')
        foo.cancel()


async def gather_cancelled():
    async with prepared_stuff() as (foo, bar, gather):
        print('gather_cancelled:')
        gather.cancel()


async def main():
    await everyting_calm()
    await foo_raises_exception()
    await foo_cancelled()
    await gather_cancelled()


asyncio.run(main())
Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • Thanks for highlighting the point about `task.cancel()` not immediately cancelling, but rather ensuring the next time the task is awoken it will raise a `CancelledError`. However, all these nested `try`/`except` blocks don't seem to scale very well with _n_ tasks in a gather. Any ideas about the point made in the Edit? – jsstuball Dec 25 '18 at 04:05
  • 1
    Suggest calling `asyncio.ensure_future` instead of `create_task`. While `ensure_future` is often misusued when `create_task` would suffice, in this case the former is actually appropriate. Since `gather` accepts arbitrary awaitables and not just coroutine objects, you need to call `ensure_future` to convert them to futures. (Using `create_task` would unnecessarily restrict `advance_gather` to just coroutines.) – user4815162342 Dec 25 '18 at 16:24
  • I'm still having difficulty coming up with code to verify this line in the docs: _If gather() is cancelled, all submitted awaitables (that have not completed yet) are also cancelled._ – jsstuball Dec 26 '18 at 07:39
  • 1
    @JSStuball I updated answer added snippet showing different cases of what can happen. You can check it and compare with documentation. Situation when gather() is cancelled is shown in `gather_cancelled()` coroutine. – Mikhail Gerasimov Dec 26 '18 at 12:21