11

I'm watching import asyncio: Learn Python's AsyncIO #3 - Using Coroutines. The instructor gave the following example:

import asyncio
import datetime

async def keep_printing(name):
    while True:
        print(name, end=" ")
        print(datetime.datetime.now())
        await asyncio.sleep(0.5)

async def main():
    group_task = asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )
    try:
        await asyncio.wait_for(group_task, 3)
    except asyncio.TimeoutError:
        print("Time's up!")


if __name__ == "__main__":
    asyncio.run(main())

The output had an exception:

First 2020-08-11 14:53:12.079830
Second 2020-08-11 14:53:12.079830
Third 2020-08-11 14:53:12.080828 
First 2020-08-11 14:53:12.580865
Second 2020-08-11 14:53:12.580865
Third 2020-08-11 14:53:12.581901 
First 2020-08-11 14:53:13.081979
Second 2020-08-11 14:53:13.082408
Third 2020-08-11 14:53:13.082408 
First 2020-08-11 14:53:13.583497
Second 2020-08-11 14:53:13.583935
Third 2020-08-11 14:53:13.584946
First 2020-08-11 14:53:14.079666
Second 2020-08-11 14:53:14.081169
Third 2020-08-11 14:53:14.115689
First 2020-08-11 14:53:14.570694
Second 2020-08-11 14:53:14.571668
Third 2020-08-11 14:53:14.635769
First 2020-08-11 14:53:15.074124
Second 2020-08-11 14:53:15.074900
Time's up!
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError

The instructor tried to the handle the CancelledError by adding a try/except in keep_printing:

async def keep_printing(name):
    while True:
        print(name, end=" ")
        print(datetime.datetime.now())
        try:
            await asyncio.sleep(0.5)
        except asyncio.CancelledError:
            print(name, "was cancelled!")
            break

However, the same exception still occurred:

# keep printing datetimes
...
First was cancelled!
Second was cancelled!
Third was cancelled!
Time's up!
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError

The instructor then just proceeded to other topics and never came back to this example to show how to fix it. Fortunately, through experimentation, I discovered that we could fix it by adding another try/except under the except asyncio.TimeoutError: in the main async function:

async def main():
    group_task = asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )
    try:
        await asyncio.wait_for(group_task, 3)
    except asyncio.TimeoutError:
        print("Time's up!")
        try:
            await group_task
        except asyncio.CancelledError:
            print("Main was cancelled!")

The final output was:

# keep printing datetimes
...
First was cancelled!
Second was cancelled!
Third was cancelled!
Time's up!
Main was cancelled!

In fact, with this edition of main, we don't even need the try...except asyncio.CancelledError in keep_printing. It would still work fine.

Why was that? Why did catching CancelledError in main work but not in keep_printing? The way that the video instructor dealt with this exception only made me more confused. He didn't need to change any code of keep_printing in the first place!

Vadim Kotov
  • 8,084
  • 8
  • 48
  • 62
user141240
  • 253
  • 1
  • 2
  • 6

3 Answers3

12

Let's find out what's going on:

  1. This code schedules three coroutines to be executed and return Future object group_task (instance of internal class _GatheringFuture) aggregating results.
group_task = asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )
  1. This code waits for future to complete with a timeout. And if a timeout occurs, it cancels the future and raises asyncio.TimeoutError.
    try:
        await asyncio.wait_for(group_task, 3)
    except asyncio.TimeoutError:
        print("Time's up!")
  1. Timeout occurs. Let's look inside the asyncio library task.py. wait_for does the following:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
...
await waiter
...
await _cancel_and_wait(fut, loop=loop)  # _GatheringFuture.cancel() inside
raise exceptions.TimeoutError()
  1. When we do _GatheringFuture.cancel(), if any child tasks were actually cancelled CancelledError is propagated
class _GatheringFuture(futures.Future):
    ...
    def cancel(self):
        ...
        for child in self._children:
            if child.cancel():
                ret = True
        if ret:
            # If any child tasks were actually cancelled, we should
            # propagate the cancellation request regardless of
            # *return_exceptions* argument.  See issue 32684.
            self._cancel_requested = True
        return ret

And later

...
if outer._cancel_requested:
    # If gather is being cancelled we must propagate the
    # cancellation regardless of *return_exceptions* argument.
    # See issue 32684.
    outer.set_exception(exceptions.CancelledError())
else:
    outer.set_result(results)
  1. Thus, it is more correct to extract the result or exception from gathering future
async def main():
    group_task = asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )
    try:
        await asyncio.wait_for(group_task, 3)
    except asyncio.TimeoutError:
        print("Time's up!")

    try:
        result = await group_task
    except asyncio.CancelledError:
        print("Gather was cancelled")
alex_noname
  • 26,459
  • 5
  • 69
  • 86
6

I think you need to put await before asyncio.gather. So this call taken from your code:

    group_task = asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )

Needs to be changed into:

    group_task = await asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third")
                 )

Not sure why, I'm still learning this stuff.

RatajS
  • 1,403
  • 1
  • 14
  • 22
stacknip
  • 69
  • 1
  • 1
0

When aw is cancelled due to a timeout, wait_for waits for aw to be cancelled. You get the timeout error if handle CancelledError into your coroutine. This changed in version 3.7.
Example

import asyncio
import datetime

async def keep_printing(name):
    print(datetime.datetime.now())
    try:
        await asyncio.sleep(3600)
    except asyncio.exceptions.CancelledError:
        print("done")

async def main():
    try:
        await asyncio.wait_for(keep_printing("First"), timeout=3)
    except asyncio.exceptions.TimeoutError:
        print("timeouted")


if __name__ == "__main__":
    asyncio.run(main())

The gather method used for retrieving results from Task or Future, you have an infinite loop and never return any result. If any Task or Future from the aws sequence is cancelled (what's happening with the wait_for), it is treated as if it raised CancelledError – the gather() call is not cancelled in this case. This is to prevent the cancellation of one submitted Task/Future to cause other Tasks/Futures to be cancelled.
For the protecting gather method, you can cover it to the shield.

import asyncio
import datetime


async def keep_printing(name):
    while True:
        print(name, datetime.datetime.now())
        try:
            await asyncio.sleep(0.5)
        except asyncio.exceptions.CancelledError:
            print(f"canceled {name}")
            return None

async def main():
    group_task = asyncio.shield(asyncio.gather(
                     keep_printing("First"),
                     keep_printing("Second"),
                     keep_printing("Third"))
                    )
    try:
        await asyncio.wait_for(group_task, 3)
    except asyncio.exceptions.TimeoutError:
        print("Done")


if __name__ == "__main__":
    asyncio.run(main())