anyio
is a part of starlette
and, therefore, of FastAPI. I find it quite convenient to use its task groups to perform concurrent requests to external services outside of one of my API servers.
Also, I would like to stream out the results as soon as they are ready. fastapi.StreamingResponse could do the trick, still I need to be able to keep the task group up and running after returning StreamingResponse
, but it sounds like something that goes against the idea of structured concurrency.
Using an asynchronous generator may look like an obvious solution, but yield
in general can not be used in a task group context, according to this: https://trio.readthedocs.io/en/stable/reference-core.html#cancel-scopes-and-nurseries
There is an example of a FastAPI server that seems to work, though it aggregates the responses before returning them:
import anyio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/")
async def root():
# What to put below?
result = await main()
return StreamingResponse(iter(result))
async def main():
send_stream, receive_stream = anyio.create_memory_object_stream()
result = []
async with anyio.create_task_group() as tg:
async with send_stream:
for num in range(5):
tg.start_soon(sometask, num, send_stream.clone())
async with receive_stream:
async for entry in receive_stream:
# What to do here???
result.append(entry)
return result
async def sometask(num, send_stream):
await anyio.sleep(1)
async with send_stream:
await send_stream.send(f'number {num}\n')
if __name__ == "__main__":
import uvicorn
# Debug-only configuration
uvicorn.run(app)
So, the question is, is there something similar to @trio_util.trio_async_generator
in anyio
, or is it possible to use @trio_util.trio_async_generator
with FastAPI directly?
Maybe there are other solutions?