4

I'm trying to add some code to my existing asyncio loop to provide for a clean shutdown on Ctrl-C. Below is an abstraction of the sort of thing it's doing.

import asyncio, signal

async def task1():
    print("Starting simulated task1")
    await asyncio.sleep(5)
    print("Finished simulated task1")

async def task2():
    print("Starting simulated task2")
    await asyncio.sleep(5)
    print("Finished simulated task2")

async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            await asyncio.shield(tasks())
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop")
        raise

async def aiomain():
    loop = asyncio.get_running_loop()
    task = asyncio.Task(task_loop())
    loop.add_signal_handler(signal.SIGINT, task.cancel)
    await task

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        pass

#def main():
#    try:
#        loop = asyncio.get_event_loop()
#        loop.create_task(aiomain())
#        loop.run_forever()
#    except asyncio.CancelledError:
#        pass

if __name__ == '__main__':
    main()

In this example, imagine that the sequence of task1 and task2 needs to be finished once it's started, or some artifacts will be left in an inconsistent state. (Hence the asyncio.shield wrapper around calling tasks.)

With the code as above, if I interrupt the script soon after it starts and it's just printed Starting simulated task1 then the loop stops and task2 never gets started. If I try switching to the version of main that's commented out then that one never exits, even though the loop is properly cancelled and nothing further happens at least for several minutes. It does have a bit of progress in that it at least finishes any in-progress sequence of task1 and task2.

Some possible solutions from brainstorming, though I still get the feeling there must be something simpler that I'm missing:

  • Create a wrapper around asyncio.shield which increments a variable synchronized by an asyncio.Condition object, runs the shielded function, then decrements the variable. Then, in aiomain in a CancelledError handler, wait for the variable to reach zero before reraising the exception. (In an implementation, I would probably go for combining all the parts of this into one class with __aexit__ implementing the wait for zero on CancelledError logic.)
  • Skip using asyncio's cancellation mechanism entirely, and instead use an asyncio.Event or similar to allow for interruption points or interruptible sleeps. Though this does seem like it would be more invasive requiring me to specify what points are considered interruptible, as opposed to declaring what sequences need to be shielded from cancellation.
Daniel Schepler
  • 3,043
  • 14
  • 20

2 Answers2

4

This is a very good question. I have learned some things while working out an answer, so I hope you are still monitoring this thread.

The first thing to investigate is, how does the shield() method work? On this point, the docs are confusing to say the least. I couldn't figure it out until I read the standard library test code in test_tasks.py. Here is my understanding:

Consider this code fragment:

async def coro_a():
    await asyncio.sheild(task_b())
    ...
task_a = asyncio.create_task(coro_a())
task_a.cancel()

When the task_a.cancel() statement is executed, task_a is indeed cancelled. The await statement throws a CancelledError immediately, without waiting for task_b to finish. But task_b continues to run. The outer task (a) stops but the inner task (b) doesn't.

Here is a modified version of your program that illustrates this. The major change is to insert a wait in your CancelledError exception handler, to keep your program alive a few seconds longer. I'm running on Windows and that's why I changed your signal handler a little bit also, but that's a minor point. I also added time stamps to the print statements.

import asyncio
import signal
import time

async def task1():
    print("Starting simulated task1", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task1", time.time())

async def task2():
    print("Starting simulated task2", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task2", time.time())

async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            await asyncio.shield(tasks())
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop", time.time())
        raise

async def aiomain():
    task = asyncio.create_task(task_loop())
    KillNicely(task)
    try:
        await task
    except asyncio.CancelledError:
        print("Caught CancelledError", time.time())
        await asyncio.sleep(5.0)
        raise

class KillNicely:
    def __init__(self, cancel_me):
        self.cancel_me = cancel_me
        self.old_sigint = signal.signal(signal.SIGINT,
                                        self.trap_control_c)

    def trap_control_c(self, signum, stack):
        if signum != signal.SIGINT:
            self.old_sigint(signum, stack)
        else:
            print("Got Control-C", time.time())
            print(self.cancel_me.cancel())

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        print("Program exit, cancelled", time.time())

# Output when ctrlC is struck during task1
# 
# Starting simulated task1 1590871747.8977509
# Got Control-C 1590871750.8385916
# True
# Shutting down task loop 1590871750.8425908
# Caught CancelledError 1590871750.8435903
# Finished simulated task1 1590871752.908434
# Starting simulated task2 1590871752.908434
# Program exit, cancelled 1590871755.8488846        

if __name__ == '__main__':
    main()

You can see that your program didn't work because it exited as soon as task_loop was cancelled, before task1 and task2 had a chance to finish. They were still there all along (or rather they would have been there, if the program continued to run).

This illustrates how shield() and cancel() interact, but it doesn't actually solve your stated problem. For that, I think, you need to have an awaitable object that you can use to keep the program alive until the vital tasks are finished. This object needs to be created at the top level and passed down the stack to the place where the vital tasks are executing. Here is a program that is similar to yours, but preforms the way you want.

I did three runs: (1) control-C during task1, (2) control-C during task2, (3) control-C after both tasks were finished. In the first two cases the program continued until task2 was finished. In the third case it ended immediately.

import asyncio
import signal
import time

async def task1():
    print("Starting simulated task1", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task1", time.time())

async def task2():
    print("Starting simulated task2", time.time())
    await asyncio.sleep(5)
    print("Finished simulated task2", time.time())

async def tasks(kwrap):
    fut = asyncio.get_running_loop().create_future()
    kwrap.awaitable = fut
    await task1()
    await task2()
    fut.set_result(1)

async def task_loop(kwrap):
    try:
        while True:
            await asyncio.shield(tasks(kwrap))
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop", time.time())
        raise

async def aiomain():
    kwrap = KillWrapper()
    task = asyncio.create_task(task_loop(kwrap))
    KillNicely(task)
    try:
        await task
    except asyncio.CancelledError:
        print("Caught CancelledError", time.time())
        await kwrap.awaitable
        raise

class KillNicely:
    def __init__(self, cancel_me):
        self.cancel_me = cancel_me
        self.old_sigint = signal.signal(signal.SIGINT,
                                        self.trap_control_c)

    def trap_control_c(self, signum, stack):
        if signum != signal.SIGINT:
            self.old_sigint(signum, stack)
        else:
            print("Got Control-C", time.time())
            print(self.cancel_me.cancel())

class KillWrapper:
    def __init__(self):
        self.awaitable = asyncio.get_running_loop().create_future()
        self.awaitable.set_result(0)

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        print("Program exit, cancelled", time.time())

# Run 1 Control-C during task1
# Starting simulated task1 1590872408.6737766
# Got Control-C 1590872410.7344952
# True
# Shutting down task loop 1590872410.7354996
# Caught CancelledError 1590872410.7354996
# Finished simulated task1 1590872413.6747622
# Starting simulated task2 1590872413.6747622
# Finished simulated task2 1590872418.6750958
# Program exit, cancelled 1590872418.6750958
#
# Run 1 Control-C during task2
# Starting simulated task1 1590872492.927735
# Finished simulated task1 1590872497.9280624
# Starting simulated task2 1590872497.9280624
# Got Control-C 1590872499.5973852
# True
# Shutting down task loop 1590872499.5983844
# Caught CancelledError 1590872499.5983844
# Finished simulated task2 1590872502.9274273
# Program exit, cancelled 1590872502.9287038
#
# Run 1 Control-C after task2 -> immediate exit
# Starting simulated task1 1590873694.2925708
# Finished simulated task1 1590873699.2928336
# Starting simulated task2 1590873699.2928336
# Finished simulated task2 1590873704.2938952
# Got Control-C 1590873706.0790765
# True
# Shutting down task loop 1590873706.0804725
# Caught CancelledError 1590873706.0804725
# Program exit, cancelled 1590873706.0814824
Paul Cornelius
  • 9,245
  • 1
  • 15
  • 24
  • Thanks - I posted my own answer using a similar idea, but with some comments why waiting "at the top level" didn't end up working for me so my solution moved the wait down into the wrapper around `shield`. – Daniel Schepler May 30 '20 at 23:15
0

Here is what I ended up using:

import asyncio, signal

async def _shield_and_wait_body(coro, finish_event):
    try:
        await coro
    finally:
        finish_event.set()

async def shield_and_wait(coro):
    finish_event = asyncio.Event()
    task = asyncio.shield(_shield_and_wait_body(coro, finish_event))
    try:
        await task
    except asyncio.CancelledError:
        await finish_event.wait()
        raise

def shield_and_wait_decorator(coro_fn):
    return lambda *args, **kwargs: shield_and_wait(coro_fn(*args, **kwargs))

async def task1():
    print("Starting simulated task1")
    await asyncio.sleep(5)
    print("Finished simulated task1")

async def task2():
    print("Starting simulated task2")
    await asyncio.sleep(5)
    print("Finished simulated task2")

@shield_and_wait_decorator
async def tasks():
    await task1()
    await task2()

async def task_loop():
    try:
        while True:
            # Alternative to applying @shield_and_wait_decorator to tasks()
            #await shield_and_wait(tasks())
            await tasks()
            await asyncio.sleep(60)
    except asyncio.CancelledError:
        print("Shutting down task loop")
        raise

def sigint_handler(task):
    print("Cancelling task loop")
    task.cancel()

async def aiomain():
    loop = asyncio.get_running_loop()
    task = asyncio.Task(task_loop())
    loop.add_signal_handler(signal.SIGINT, sigint_handler, task)
    await task

def main():
    try:
        asyncio.run(aiomain())
    except asyncio.CancelledError:
        pass

if __name__ == '__main__':
    main()

Similar to the answer by Paul Cornelius, this inserts a wait for the subtask to finish before allowing the CancelledError to propagate up the call chain. However, it does not require touching the code other than at the point you would be calling asyncio.shield.

(In my actual use case, I had three loops running simultaneously, using an asyncio.Lock to make sure one task or sequence of tasks finished before another would start. I also had an asyncio.Condition on that lock communicating from one coroutine to another. When I tried the approach of waiting in aiomain or main for all shielded tasks to be done, I ran into an issue where a cancelled parent released the lock, then a shielded task tried to signal the condition variable using that lock, giving an error. It also didn't make sense to move acquiring and releasing the lock into the shielded task - that would result in task B still running in the sequence: shielded task A starts, coroutine for task B expires its timer and blocks waiting for the lock, Control+C. By putting the wait at the point of the shield_and_wait call, on the other hand, it neatly avoided prematurely releasing the lock.)

One caveat: it seems that shield_and_wait_decorator doesn't work properly on class methods.

Daniel Schepler
  • 3,043
  • 14
  • 20
  • Tough problem. Your solution looks good and I like the decorator idea. The only significant difference I can see is that in your solution, the task_loop doesn't "know" about the cancellation until tasks() is finished. But I can't see why that would matter to the application logic. I've written a couple of medium-sized asyncio applications over the past two years but I never used shield before. Now I know what it does. Thanks for the challenge. – Paul Cornelius May 31 '20 at 03:50
  • Now for the next challenge - coming up with a way for SIGTERM to act as a "stronger request for shutdown" so for example certain long-running subprocesses can be terminated. Incidentally, in case anyone is curious about the actual use case: https://github.com/dschepler/microbuildd . – Daniel Schepler May 31 '20 at 17:12
  • I'm thinking something roughly along the lines of: in a shielded routine, could run `done, pending = await asyncio.wait({subsubtask, term_event})\nif term_event in done and subsubtask in pending:\n subsubtask.cancel()\n await subsubtask` and then create a wrapper around the asyncio subprocess object where an `__aexit__` handler on `CancelledError` runs `subprocess.terminate()` – Daniel Schepler May 31 '20 at 17:16