8

I'm trying 2 ways to stop an infinite loop from running:

  • supervisor_1: task is canceled programatically
  • supervisor_2: task is stopped with Ctrl+C

While supervisor_2 does not throw any errors at when interrupted, I cannot get supervisor_1 from getting Task was destroyed but it is pending!. Any idea why ?

Here is the code:

import asyncio
import aioredis
from functools import partial



class Listener:
    def __init__(self, redis_conn):
        self.redis_conn = redis_conn

    async def forever(self, loop_name):
        counter = 0
        try:
            while True:
                print('{}: {}'.format(loop_name, counter))
                counter += 1
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print('Task Cancelled')
            self.redis_conn.close()
            await self.redis_conn.wait_closed()


async def supervisor_1(redis_conn):
    redis_conn = await redis_conn

    l = Listener(redis_conn)

    task = asyncio.ensure_future(
        asyncio.gather(l.forever('loop_1'), 
                       l.forever('loop_2')))
    await asyncio.sleep(2)
    task.cancel()


async def supervisor_2(redis_conn):
    redis_conn = await redis_conn

    l = Listener(redis_conn)
    await asyncio.gather(l.forever('loop_1'), 
                         l.forever('loop_2'))


if __name__ == '__main__':
    redis_conn = aioredis.create_pool(('localhost', 5003), db=1)

    loop = asyncio.get_event_loop()
    run = partial(supervisor_2, redis_conn=redis_conn)
    task = asyncio.ensure_future(run())
    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt:
        print('Interruped !')
        task.cancel()
        loop.run_forever()
    finally:
        loop.close()

@update:

Thanks to @Gerasimov, here is a version that fix the problem, but somehow still raise errors from time to time on KeyboardInterrupt:

async def supervisor(redis_conn):
    redis_conn = await redis_conn

    l = Listener(redis_conn)

    task = asyncio.ensure_future(
        asyncio.gather(l.forever('loop_1'), 
                       l.forever('loop_2'))
    )
    await asyncio.sleep(10)
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task

async def kill_tasks():
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        with suppress(asyncio.CancelledError):
            await task 

and

if __name__ == '__main__':
    redis_conn = aioredis.create_pool(('localhost', 5003), db=1)

    loop = asyncio.get_event_loop()
    run = partial(supervisor, redis_conn=redis_conn)
    task = asyncio.ensure_future(run())
    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt:
        print('Interruped !')
        loop.run_until_complete(kill_tasks())
    finally:
        loop.close()
Orelus
  • 963
  • 1
  • 13
  • 23

1 Answers1

10

task.cancel() itself doesn't finish the task: it just says to task that CancelledError should be raised inside it and returns immediately. You should call it and await while task would be actually cancelled (while it'll raise CancelledError).

You also shouldn't suppress CancelledError inside task.

Read this answer where I tried to show different ways of working with tasks. For example to cancel some task and await it cancelled you can do:

from contextlib import suppress


task = ...  # remember, task doesn't suppress CancelledError itself

task.cancel()  # returns immediately, we should await task raised CancelledError.

with suppress(asyncio.CancelledError):
    await task  # or loop.run_until_complete(task) if it happens after event loop stopped

# Now when we awaited for CancelledError and handled it, 
# task is finally over and we can close event loop without warning.
Community
  • 1
  • 1
Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • thanks for the link. I updated my answer with what I understood of a fix. But still getting errors (however not always like before) – Orelus May 10 '17 at 12:36
  • 1
    @Orelus, same error as before? Try to move `loop.run_until_complete(kill_tasks())` into finally block, just before `loop.close()`. This should probably solve problem. I'm not sure what your `run()` coroutine do, but probably situation can happen, when it finished, but some tasks are not: in that case on closing event loop you'll get warning even if no `KeyboardInterrupt` happened. – Mikhail Gerasimov May 10 '17 at 20:09
  • Note a downside to using this approach: if the calling task is itself cancelled while it is waiting for the child task to clean up and exit, the `suppress()` call will swallow *that* `CancelledError` instead and, in fact, the `await task` could be aborted early. It looks like current asyncio (as of 3.8 at least) may have no way to avoid this. – Peter Hansen Jan 24 '21 at 17:57