1

I have a fastAPI app that posts two requests, one of them is longer (if it helps, they're Elasticsearch queries and I'm using the AsyncElasticsearch module which already returns coroutine). This is my attempt:

class my_module:
    search_object = AsyncElasticsearch(url, port)

    async def do_things(self):
        resp1 = await search_object.search() #the longer one
        print(check_resp1)
        resp2 = await search_object.search() #the shorter one
        print(check_resp2)
        process(resp2)
        process(resp1)
        do_synchronous_things()
        return thing

app = FastAPI()
@app.post("/")
async def service(user_input):
    result = await my_module.do_things()
    return results

What I observed is instead of awaiting resp1, by the time it got to check_resp1 it's already a full response, as if I didn't use async at all.

I'm new to python async, I knew my code wouldn't work, but I don't know how to fix it. As far as I understand, when interpreter sees await it starts the function then just moves on, which in this case should immediately post the next request. How do I make it do that?

Xuekai Du
  • 617
  • 1
  • 6
  • 27
  • I think you have something mixed up here. When you use `await` then that *literally* means "stop here and wait until the result has arrived". So naturally, one line after `await search_object.search()` the response is fully available. If you don't want to wait, don't use `await`. – Tomalak Apr 02 '21 at 09:10
  • 1
    The only difference between using `await` and using a synchronous function call is that `await` only pauses the current function, and not the entire world. Your program can do other things while any number of functions are waiting for something. But from the function's point of view, `result = await asynchronous_thing()` and `result = synchronous_thing()` behave exactly the same. – Tomalak Apr 02 '21 at 09:22
  • @Tomalak Thanks! So I indeed understood it wrong. Like you said, if `await` pauses the "current function", does it mean every task that is I/O bound needs to be written in a separate function? And how does it come back to it? I guess this is more lower-level, but can I just take for granted it'll happen when finished? – Xuekai Du Apr 02 '21 at 09:35
  • You can execute tasks sequentially by using `await` once per task (`a = await func_a(); b = await func_b()`) or in parallel by shooting off the tasks back-to-back and using `await` once for the group (`tasks = [func_a(), func_b()]; a, b = await asyncio.gather(*tasks)`), where `asyncio.gather()` is the helper that gives you a single awaitable task which completes once all its arguments have completed. – Tomalak Apr 02 '21 at 09:47
  • Does this answer your question? [FastAPI runs api-calls in serial instead of parallel fashion](https://stackoverflow.com/questions/71516140/fastapi-runs-api-calls-in-serial-instead-of-parallel-fashion) – Chris Jan 28 '23 at 07:47

2 Answers2

4

Yes, that's correct the coroutine won't proceed until the results are ready. You can use asyncio.gather to run tasks concurrently:

import asyncio


async def task(msg):
    print(f"START {msg}")
    await asyncio.sleep(1)
    print(f"END {msg}")

    return msg


async def main():
    await task("1")
    await task("2")

    results = await asyncio.gather(task("3"), task("4"))

    print(results)


if __name__ == "__main__":
    asyncio.run(main())

Test:

$ python test.py
START 1
END 1
START 2
END 2
START 3
START 4
END 3
END 4
['3', '4']

Alternatively you can use asyncio.as_completed to get the earliest next result:

for coro in asyncio.as_completed((task("5"), task("6"))):
    earliest_result = await coro
    print(earliest_result)

Update Fri 2 Apr 09:25:33 UTC 2021:

asyncio.run is available since Python 3.7+, in previous versions you will have to create and start the loop manually:

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
HTF
  • 6,632
  • 6
  • 30
  • 49
2

Explanation

The reason your code run synchronyously is that in do_things function, the code is executed as follow:

  1. Schedule search_object.search() to execute
  2. Wait till search_object.search() is finished and get the result
  3. Schedule search_object.search() to execute
  4. Wait till search_object.search() is finished and get the result
  5. Execute (synchronyously) process(resp2)
  6. Execute (synchronyously) process(resp1)
  7. Execute (synchronyously) do_synchronous_things()

What you intended, is to make steps 1 and 3 executed before 2 and 4. You can make it easily with unsync library - here is the documentation.

How you can fix this

from unsync import unsync

class my_module:
    search_object = AsyncElasticsearch(url, port)

    @unsync
    async def search1():
        return await search_object.search()

    @unsync
    async def search2():  # not sure if this is any different to search1
        return await search_object.search()

    async def do_things(self):
        task1, task2 = self.search1(), self.search2()  # schedule tasks
        resp1, resp2 = task1.result(), task2.result()  # wait till tasks are executed
        # you might also do similar trick with process function to run process(resp2) and process(resp1) concurrently
        process(resp2)
        process(resp1)
        do_synchronous_things()  # if this does not rely on resp1 and resp2 it might also be put into separate task to make the computation quicker. To do this use @unsync(cpu_bound=True) decorator
        return thing

app = FastAPI()
@app.post("/")
async def service(user_input):
    result = await my_module.do_things()
    return results

More information

If you want to learn more about asyncio and asyncronyous programming, I recommend this tutorial. There is also similar case that you presented with a few possible solutions to make the coroutines run concurrently.

PS. Obviosuly I could not run this code, so you must debug it on your own.

Maciek
  • 463
  • 8
  • 22
  • Thanks for your answer! One question: how decorating `search1` and `search2` with `unsync` makes the process different than running them using `asyncio.gather([search1(), search2()])`? – Mattia Paterna Jun 24 '21 at 13:38
  • I also see an alternative way to run sync and/or blocking functions in a concurrent way decorating them with `@aiomisc.threaded` and then again gathering the tasks together. I do not know whether there is any difference in how they run in the two different libraries though. – Mattia Paterna Jun 24 '21 at 13:43
  • 1
    From what I know this is pretty much the same. The only difference is that that unsync is written in a way that you do not have to worry about loops management and task creations as these things are done implicitly. So it would probably be a lot easier to use for beginners (including myself) and avoid rookie mistakes. – Maciek Jun 24 '21 at 16:00
  • Thanks @Maciek, then I shall use `unsync` too because I am definitely no good in Python async stuff yet :) Thanks for sharing the library! – Mattia Paterna Jun 25 '21 at 08:23