3

The question should be simple enough, but I couldn't find anything about it.

I have an async Python program that contains a rather long-running task that I want to be able to suspend and restart at arbitrary points (arbitrary of course meaning everywhere where there's an await keyword).

I was hoping there was something along the lines of task.suspend() and task.resume() but it seems there isn't.

Is there an API for this on task- or event-loop-level or would I need to do this myself somehow? I don't want to place an event.wait() before every await...

Peter Mortensen
  • 30,738
  • 21
  • 105
  • 131
thisisalsomypassword
  • 1,423
  • 1
  • 6
  • 17
  • I think the need for explicit `sleep(0)` probably points to a flaw in how my implementation handles cancelation. (`sleep(0)` is almost always a "code smell" in asyncio code.) Maybe you need a try/except CancalledError around the inner `while` loop, and in case of `CancelledError` do `send, message = iter_throw, exception_instance`. That way a cancel that interrupts `Event.wait` will be correctly propagated to the coroutine. – user4815162342 Mar 20 '21 at 09:39
  • Hmm, I think your implementation is fine. I made a minimal example from the asyncio-docs for task cancellation using your code and everything works as expected without using `asyncio.sleep(0)`. However, in my first attempt at the minimal example I made the mistake to `await` the suspendable, resulting in a `RuntimeError` since it was already `await`ed in `run_wrapper`. I'm doing this in the actual application as well, so I'm guessing the `RuntimeError` might have been swallowed by uvicorn but resulted in unexpected behaviour. – thisisalsomypassword Mar 20 '21 at 21:45
  • Right, awaiting the suspendable is not allowed because its ownership is taken over by `run_wrapper`, which is in turn owned by the task. `run_wrapper` is only needed because `create_task()` AFAIR requires an actual coroutine. Perhaps I could have passed the suspendable directly to `ensure_future()`, but I didn't feel like experimenting, the code was involved enough as it was. – user4815162342 Mar 20 '21 at 22:19
  • But I'm still a bit worried about a `cancel()` that arrives while `Suspendable.__await__` is suspended in `yield from self._can_run.wait().__await__()`. I think the cancellation will be injected into the `Event.wait` and the `yield from` will raise, so the cancellation will never be propagated to the target coroutine. I thought that was why you needed `sleep(0)` following resume, to ensure that the cancellation occurs after `Event.wait` is done. That should be fixed by the change mentioned in the previous comment, after which `sleep(0)` would no longer be needed. – user4815162342 Mar 20 '21 at 22:19
  • 1
    You were right to be worried :). I repeated the test with the minimal example and I overlooked that while the task got cancelled when it was suspended, the `CancelledError` was not raised inside the coro. The exception is in fact raised at the yield from and can be caught with another try/except as you suggested. I will update the code above again to reflect these changes. With this implementation I was able to cancel the task without any addtional `asyncio.sleep(0)`, suspended or not. – thisisalsomypassword Mar 21 '21 at 10:57
  • Thanks for pursuing this. I've now incorporated your improvements in my answer. It turns out `ensure_future` [already has](https://github.com/python/cpython/blob/20a5b7e986377bdfd929d7e8c4e3db5847dfdb2d/Lib/asyncio/tasks.py#L639) the equivalent of my `run_wrapper`, so it's not necessary. Also, I added an `is_suspended()` method, but I refrained from adding the task-manipulating methods, because `get_task()` should be sufficient. (Cancellation should now "just work" with a simple `x.get_task().cancel()`.) If that works for you, you can delete the copy of the solution from the question. – user4815162342 Mar 21 '21 at 12:10
  • Sure, will do. Thanks for all your help! – thisisalsomypassword Mar 21 '21 at 14:35
  • Thanks for asking an interesting question! I'm pretty sure similar questions were asked before (so there is interest in that functionality), and have gone unanswered because the solution is not exactly trivial. – user4815162342 Mar 21 '21 at 14:37
  • Yes, I was actually surprised that I didn't find anything 'readymade'. I feels only natural to be able to suspend a task whose literal purpose it is to be suspended! I guess if you let the user do it, there's some risk of having a lot of suspended tasks in the loop, which can be a performance issue. – thisisalsomypassword Mar 22 '21 at 08:07
  • One possible issue is that the additional layer of suspension comes without cooperation from the task, which might call issues if the coroutine expects to be resumed soon after an event. For example, imagine a coroutine waiting for a socket to become readable in order to read from it. If it gets suspended during the wait, its asyncio task might keep getting woken on up every event loop iteration because AFAIK the event loop polling of file descriptors is level-triggered. That might cause the event loop to never go to sleep and to remain in a kind of busy-loop until the task is force-resumed. – user4815162342 Mar 22 '21 at 09:11
  • Speaking of issues, I think I just stumbled on another one... It seems when the target coroutine is waiting on some `asyncio.Event` while it is suspended, the `__await__´ method is still at `message = yield signal`, hence it never hits the `yield from`. If the task is cancelled in this state the `CancelledError` is raised on the yield and then doesn't make it over the `yield from` since `_can_run` is still unset. This results in some sort of deadlock. I changed the while to 'while not self._can_run.is_set() and not isinstance(message, BaseException):'. – thisisalsomypassword Mar 22 '21 at 10:30
  • This works, but it feels kind of hacky. One could also set `_can_run` in the exception handlers, but this would change the event state and maybe the coro wants to reject the cancellation and would then be unwillingly resumed. – thisisalsomypassword Mar 22 '21 at 10:32
  • 1
    The question is what you want to happen when a suspended task is cancelled. My implementation takes suspension seriously and waits for the to be resumed before delivering the cancellation. (I'm not sure how a deadlock occurs in your usage.) I think it's ok to change the code the way you did, if that's the semantics you need. I might have written the loop condition as `while send is not iter_throw and not self._can_run.is_set()`, but that's equivalent to your formulation in asyncio because the event loop will resume us either with a `None` message or by delivering a `CancelledError` exception. – user4815162342 Mar 22 '21 at 11:04

1 Answers1

3

What you're asking for is possible, but not trivial. First, note that you can never have suspends on every await, but only on those that result in suspension of the coroutine, such as asyncio.sleep(), or a stream.read() that doesn't have data ready to return. Awaiting a coroutine immediately starts executing it, and if the coroutine can return immediately, it does so without dropping to the event loop. await only suspends to the event loop if the awaitee (or its awaitee, etc.) requests it. More details in these questions: [1], [2], [3], [4].

With that in mind, you can use the technique from this answer to intercept each resumption of the coroutine with additional code that checks whether the task is paused and, if so, waits for the resume event before proceeding.

import asyncio

class Suspendable:
    def __init__(self, target):
        self._target = target
        self._can_run = asyncio.Event()
        self._can_run.set()
        self._task = asyncio.ensure_future(self)

    def __await__(self):
        target_iter = self._target.__await__()
        iter_send, iter_throw = target_iter.send, target_iter.throw
        send, message = iter_send, None
        # This "while" emulates yield from.
        while True:
            # wait for can_run before resuming execution of self._target
            try:
                while not self._can_run.is_set():
                    yield from self._can_run.wait().__await__()
            except BaseException as err:
                send, message = iter_throw, err

            # continue with our regular program
            try:
                signal = send(message)
            except StopIteration as err:
                return err.value
            else:
                send = iter_send
            try:
                message = yield signal
            except BaseException as err:
                send, message = iter_throw, err

    def suspend(self):
        self._can_run.clear()

    def is_suspended(self):
        return not self._can_run.is_set()

    def resume(self):
        self._can_run.set()

    def get_task(self):
        return self._task

Test:

import time

async def heartbeat():
    while True:
        print(time.time())
        await asyncio.sleep(.2)

async def main():
    task = Suspendable(heartbeat())
    for i in range(5):
        print('suspending')
        task.suspend()
        await asyncio.sleep(1)
        print('resuming')
        task.resume()
        await asyncio.sleep(1)

asyncio.run(main())
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • Thanks for this great solution and the additional links and clarifications! I will edit my post to reflect my final code. – thisisalsomypassword Mar 19 '21 at 14:17
  • Not sure if you need `self._task = asyncio.ensure_future(self)`. In my case it caused `__await__` to be called twice, leading to a "RuntimeError: cannot reuse already awaited coroutine" at `yield signal`. But without `self._task` this code saved my day. :) – Falko Feb 08 '22 at 08:05
  • @Falko Can you provide a pastebin (or similar) with the test code that leads to that exception? I'd like to fix the code, but the test code in the answer doesn't raise. – user4815162342 Feb 08 '22 at 08:15
  • @user4815162342 Oh I see, you don't await the `task`. I was trying something like `async def main(): await Suspendable(heartbeat())`. Here is a more complete example: https://pastebin.com/KCiLLtrH – Falko Feb 08 '22 at 10:02
  • @Falko Thanks for the example. I think the idea is that you don't await the task, but run it in the background, only occasionally suspending/resuming it. The `__await__()` method is an implementation detail where I placed the interception code, which is confusing here because it makes it look like `Suspendable` is awaitable. If one were to remove `ensure_future()`, then the task wouldn't automatically start. But there are probably other ways to start it, so your modification is likely correct. – user4815162342 Feb 08 '22 at 10:22