How can I implement the functionality of asyncio as_completed
on anyio?
I have a messagebus that pickup an user command and directs it to appropriate handler. That may generate a new domain event that should be picked up by the bus too. Using asyncio.as_completed
I can run multiple tasks concurrently, get the results of each task as they complete, check if a new event was generated and than handle this new event. I would like to use anyio, but don't know how.
This is kinda of what I am doing with asyncio:
import asyncio
import itertools
import anyio
async def handle(event: str):
await handle_event(event)
async def handle_event(event: str):
if event == "event":
coros = [slow_2(), slow_5()]
else:
coros = [slow_1(), slow_1()]
for coro in asyncio.as_completed(coros):
result = await coro
new_events = []
if result == "event":
new_events.append(["", ""])
if new_events:
async with anyio.create_task_group() as tg:
for event in new_events:
tg.start_soon(handle_event, event)
async def spin(msg: str) -> None:
for char in itertools.cycle(r"\|/-"):
status = f"\r{char} {msg}"
print(status, flush=True, end="")
try:
await anyio.sleep(0.1)
except Exception:
break
blanks = " " * len(status)
print(f"\r{blanks}\r", end="")
async def slow_1():
await anyio.sleep(1)
print("slow_1")
async def slow_2():
await anyio.sleep(2)
print("slow_2")
return "event"
async def slow_5():
await anyio.sleep(5)
print("slow_5")
async def supervisor():
async with anyio.create_task_group() as tg:
with anyio.CancelScope(shield=True) as scope:
tg.start_soon(spin, "thinking!")
await handle("event")
tg.cancel_scope.cancel()
if __name__ == "__main__":
anyio.run(supervisor)