3

anyio is a part of starlette and, therefore, of FastAPI. I find it quite convenient to use its task groups to perform concurrent requests to external services outside of one of my API servers.

Also, I would like to stream out the results as soon as they are ready. fastapi.StreamingResponse could do the trick, still I need to be able to keep the task group up and running after returning StreamingResponse, but it sounds like something that goes against the idea of structured concurrency.

Using an asynchronous generator may look like an obvious solution, but yield in general can not be used in a task group context, according to this: https://trio.readthedocs.io/en/stable/reference-core.html#cancel-scopes-and-nurseries

There is an example of a FastAPI server that seems to work, though it aggregates the responses before returning them:

import anyio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse


app = FastAPI()


@app.get("/")
async def root():
    # What to put below?
    result = await main()
    return StreamingResponse(iter(result))


async def main():
    send_stream, receive_stream = anyio.create_memory_object_stream()

    result = []
    async with anyio.create_task_group() as tg:
        async with send_stream:
            for num in range(5):
                tg.start_soon(sometask, num, send_stream.clone())

        async with receive_stream:
            async for entry in receive_stream:
                # What to do here???
                result.append(entry)

    return result


async def sometask(num, send_stream):
    await anyio.sleep(1)
    async with send_stream:
        await send_stream.send(f'number {num}\n')



if __name__ == "__main__":
    import uvicorn
    # Debug-only configuration
    uvicorn.run(app)

So, the question is, is there something similar to @trio_util.trio_async_generator in anyio, or is it possible to use @trio_util.trio_async_generator with FastAPI directly?

Maybe there are other solutions?

oguz ismail
  • 1
  • 16
  • 47
  • 69
Oleksandr Fedorov
  • 1,213
  • 10
  • 17
  • 1
    Have you considered using a queue? They are in many ways similar to generators, albeit heavier but without ``yield`` ing. (I cannot test a ``fastapi`` setup here, so cannot verify whether it works for your case.) – MisterMiyagi Nov 19 '21 at 16:00
  • 1
    Sure, also `anyio.create_memory_object_stream()`, basically, creates something similar to a queue, and if you take a look at the source code of `trio_async_generator`, you will see that exactly this idea is used there. – Oleksandr Fedorov Nov 19 '21 at 16:35
  • Also, if I wanted this functionality today, I would simply port `trio_async_generator` code to `anyio`, the changes are straightforward. Though, to not reinvent the wheel, I wonder if someone have already solved this task in one way or another – Oleksandr Fedorov Nov 19 '21 at 16:38

1 Answers1

1
import anyio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


@app.get("/")
async def root():
    return StreamingResponse(main())


async def main():
    send_stream, receive_stream = anyio.create_memory_object_stream()

    async with anyio.create_task_group() as tg:
        async with send_stream:
            for num in range(5):
                tg.start_soon(sometask, num, send_stream.clone())

        async with receive_stream:
            async for entry in receive_stream:
                yield entry


async def sometask(num, send_stream):
    async with send_stream:
        for i in range(1000):
            await anyio.sleep(1)
            await send_stream.send(f"number {num}\n")


if __name__ == "__main__":
    import uvicorn

    # Debug-only configuration
    uvicorn.run(app)

unexpectedly, it works.

phi friday
  • 191
  • 4
  • 1
    What's unexpected about that? – Matthias Urlichs Nov 22 '21 at 14:19
  • Yes, it works for the happy path. Though, there are number of cases when things can go wrong: the entire taskgroup is cancelled, and exception is raised in one of the jobs, or outside of them, the user closed the connection before the task is finished. Trio documentation provides details on what may go wrong in such cases: https://trio.readthedocs.io/en/stable/reference-core.html#cancel-scopes-and-nurseries – Oleksandr Fedorov Jan 18 '22 at 10:48