12

Let's say I have an async generator like this:

async def event_publisher(connection, queue):
    while True:
        if not await connection.is_disconnected():
            event = await queue.get()
            yield event
        else:
            return

I consume it like this:

published_events = event_publisher(connection, queue)
async for event in published_events:
    # do event processing here

It works just fine, however when the connection is disconnected and there is no new event published the async for will just wait forever, so ideally I would like to close the generator forcefully like this:

if connection.is_disconnected():
    await published_events.aclose()

But I get the following error:

RuntimeError: aclose(): asynchronous generator is already running

Is there a way to stop processing of an already running generator?

martineau
  • 119,623
  • 25
  • 170
  • 301
Szabolcs
  • 3,990
  • 18
  • 38
  • Does the generator get stuck awaiting `queue.get()` if the disconnection happens just after `await connection.is_disconnected()`? – DurandA Feb 14 '20 at 12:48
  • @DurandA yep, it's waiting for the next event to be added to the queue, that's why the generator is running, while I try to close it. – Szabolcs Feb 14 '20 at 13:24

2 Answers2

7

It seems to be related to this issue. Noticable:

As shown in https://gist.github.com/1st1/d9860cbf6fe2e5d243e695809aea674c, it's an error to close a synchronous generator while it is being iterated.

...

In 3.8, calling "aclose()" can crash with a RuntimeError. It's no longer possible to reliably cancel a running asynchrounous generator.

Well, since we can't cancel running asynchrounous generator, let's try to cancel its running.

import asyncio
from contextlib import suppress


async def cancel_gen(agen):
    task = asyncio.create_task(agen.__anext__())
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task
    await agen.aclose()  # probably a good idea, 
                         # but if you'll be getting errors, try to comment this line

...

if connection.is_disconnected():
    await cancel_gen(published_events)

Can't test if it'll work since you didn't provide reproducable example.

Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • Yep this was the workaround I was looking for. Actually `asyncio.create_task` gives the interface to have control over a `Future`. Thanks! – Szabolcs Feb 17 '20 at 14:30
  • I get the same error when calling `loop.shutdown_asyncgens()`. Your answer suggests that with Python 3.8, `aclose()` cannot be called directly anymore, but `shutdown_asyncgens()` does call it. Does it mean that `shutdown_asyncgens()` doesn't work with Python 3.8 anymore? – JonasVautherin May 11 '20 at 11:41
  • Or does it mean that I should call `cancel()` on all the tasks before calling `shutdown_asyncgens()`? That's why I seem to understand from the discussion on the Python bugtracker... The thing is that in my case, I don't `create_task`, but I `ensure_future` for long (infinite) tasks. And I would like to stop them at some point. – JonasVautherin May 11 '20 at 11:43
  • It seems like it works: I do `task = ensure_future`, and then I `task.cancel()` before calling `shutdown_asyncgens()`. – JonasVautherin May 11 '20 at 12:05
1

You can use a timeout on the queue so is_connected() is polled regularly if there is no item to pop:

async def event_publisher(connection, queue):
    while True:
        if not await connection.is_disconnected():
            try:
                event = await asyncio.wait_for(queue.get(), timeout=10.0)
            except asyncio.TimeoutError:
                continue
            yield event
        else:
            return

Alternatively, it is possible to use Queue.get_nowait().

DurandA
  • 1,095
  • 1
  • 17
  • 35
  • Yeah I wanted to do that, however it's not a plain `Queue` object, but one from `asyncio.queues` and doesn't receive a `timeout` parameter as an argument. – Szabolcs Feb 14 '20 at 13:49
  • 1
    I did not realize I was not looking at the `asyncio.Queue` documentation. I updated my answer. – DurandA Feb 14 '20 at 14:01
  • Yeah it solves my problem half way, however when I shut down the process, I would like to forcefully close the generator even if the connection is still alive. – Szabolcs Feb 14 '20 at 14:31
  • What version of Python are you using? Could your issue be related to this [Python bug](https://bugs.python.org/issue32526)? – DurandA Feb 14 '20 at 14:48
  • I'm using 3.8.0. This bug seems to be fixed, since my problem is that I get an exception and it's not closing the generator :D – Szabolcs Feb 14 '20 at 15:01
  • Are you calling `published_events.aclose()` from another task? Can you share a minimal example code so we can reproduce your issue? – DurandA Feb 14 '20 at 15:04