0

How can I implement the functionality of asyncio as_completed on anyio?

I have a messagebus that pickup an user command and directs it to appropriate handler. That may generate a new domain event that should be picked up by the bus too. Using asyncio.as_completed I can run multiple tasks concurrently, get the results of each task as they complete, check if a new event was generated and than handle this new event. I would like to use anyio, but don't know how.

This is kinda of what I am doing with asyncio:

import asyncio
import itertools

import anyio


async def handle(event: str):
    await handle_event(event)


async def handle_event(event: str):
    if event == "event":
        coros = [slow_2(), slow_5()]
    else:
        coros = [slow_1(), slow_1()]
    for coro in asyncio.as_completed(coros):
        result = await coro
        new_events = []
        if result == "event":
            new_events.append(["", ""])
        if new_events:
            async with anyio.create_task_group() as tg:
                for event in new_events:
                    tg.start_soon(handle_event, event)


async def spin(msg: str) -> None:
    for char in itertools.cycle(r"\|/-"):
        status = f"\r{char} {msg}"
        print(status, flush=True, end="")
        try:
            await anyio.sleep(0.1)
        except Exception:
            break
    blanks = " " * len(status)
    print(f"\r{blanks}\r", end="")


async def slow_1():
    await anyio.sleep(1)
    print("slow_1")


async def slow_2():
    await anyio.sleep(2)
    print("slow_2")
    return "event"


async def slow_5():
    await anyio.sleep(5)
    print("slow_5")


async def supervisor():
    async with anyio.create_task_group() as tg:
        with anyio.CancelScope(shield=True) as scope:
            tg.start_soon(spin, "thinking!")
            await handle("event")
            tg.cancel_scope.cancel()


if __name__ == "__main__":
    anyio.run(supervisor)
Joaquim
  • 111
  • 1
  • 10

1 Answers1

0

There are a few ways you could do this, but here's one that should have almost the same API:

from collections.abc import Awaitable, Iterable
from typing import TypeVar

import anyio
from anyio import create_memory_object_stream
from anyio.abc import TaskGroup

T = TypeVar("T")


def as_completed(tg: TaskGroup, aws: Iterable[Awaitable[T]]) -> Iterable[Awaitable[T]]:
  send_stream, receive_stream = create_memory_object_stream()

  async def populate_result(a: Awaitable[T]):
    await send_stream.send(await a)

  async def wait_for_result() -> T:
    return await receive_stream.receive()

  for a in aws:
    tg.start_soon(populate_result, a)

  return (wait_for_result() for _ in aws)

async def main():
  async with anyio.create_task_group() as tg:
    coroutines = [slow_1(), slow_2(), slow_3()]
    for coroutine in as_completed(tg, coroutines):
      result = await coroutine
      # do stuff with result

anyio.run(main)

If you don't mind changing the API slightly, we can simplify a bit more:

from collections.abc import Awaitable, Iterable, AsyncIterable
from typing import TypeVar

import anyio
from anyio import create_memory_object_stream
from anyio.abc import TaskGroup

T = TypeVar("T")


def as_completed(tg: TaskGroup, aws: Iterable[Awaitable[T]]) -> AsyncIterable[Awaitable[T]]:
  send_stream, receive_stream = create_memory_object_stream()

  async def populate_result(a: Awaitable[T]):
    await send_stream.send(await a)

  for a in aws:
    tg.start_soon(populate_result, a)

  return receive_stream

async def main():
  async with anyio.create_task_group() as tg:
    coroutines = [slow_1(), slow_2(), slow_3()]
    async for result in as_completed(tg, coroutines):
      # do stuff with result

anyio.run(main)

I chose to use a TypeVar named T everywhere, but you could consider using Any instead. This would mean you can use this with mixed coroutine types.

DISCLAIMER: I haven't actually run this code, but the approach should work just fine with minor modifications if necessary.

Mezuzza
  • 419
  • 4
  • 14
  • You don't need all those `nonlocal`declarations – that statement is only needed for assigning to variables in an outer scope, not when your just reading them. – Arthur Tacca Aug 27 '23 at 21:27
  • Fair point and thanks for pointing it out. I think those were left over from an earlier draft. Edited to fix that. – Mezuzza Sep 02 '23 at 00:06