I would like to start a large number of HTTP requests and collect their results, once all of them have returned. Sending the requests in a non-blocking fashion is possible with asyncio
, but I have problems collecting their results.
I'm aware of solutions such as aiohttp that are made for this specific problem. But the HTTP requests are just an example, my question is how to use asyncio
correctly.
On the server-side, I have flask which answers every request to localhost/
with "Hello World!", but it waits 0.1 seconds before answering. In all my examples, I'm sending 10 requests. A synchronous code should take about 1 second, an asynchronous version could do it in 0.1 seconds.
On the client-side I want to spin up many requests at the same time and collect their results. I'm trying to do this in three different ways. Since asyncio needs an executor to work around blocking code, all of the approaches call loop.run_in_executor
.
This code is shared between them:
import requests
from time import perf_counter
import asyncio
loop = asyncio.get_event_loop()
async def request_async():
r = requests.get("http://127.0.0.1:5000/")
return r.text
def request_sync():
r = requests.get("http://127.0.0.1:5000/")
return r.text
Approach 1:
Use asyncio.gather()
on a list of tasks and then run_until_complete
. After reading Asyncio.gather vs asyncio.wait, it seemed like gather would wait on the results. But it doesn't. So this code returns instantly, without waiting for the requests to finish.
If I use a blocking function here, this works. Why can't I use an async function ?
# approach 1
start = perf_counter()
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_async)) # <---- using async function !
gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.003
# approach 1(B)
start = perf_counter()
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_sync)) # <---- using sync function
gathered_tasks = asyncio.gather(*tasks)
results = loop.run_until_complete(gathered_tasks)
stop = perf_counter()
print(f"finished {stop - start}") # 0.112
Python even warns me that coroutine "request_async"
was never awaited.
At this point, I have a working solution: Using a normal (not async) function in an executor. But I would like to have a solution that works with async
function definitions. Because I would like to use await
inside them (in this simple example that is not necessary, but if I move more code to asyncio
, I'm sure it will become important).
Approach 2:
Python warns me that my coroutines are never awaited. So let's await them. Approach 2 wraps all the code into an outer async function and awaits the result from the gathering. Same problem, also returns instantly (also same warning):
# approach 2
async def main():
tasks = []
for i in range(10):
tasks.append(loop.run_in_executor(None, request_async))
gathered_tasks = asyncio.gather(*tasks)
return await gathered_tasks # <-------- here I'm waiting on the coroutine
start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}") # 0.0036
This really confused me. I'm waiting on the result of gather
. Intuitively that should be propagated to the coroutines that I'm gathering. But python still complains that my coroutine is never awaited.
I read some more and found: How could I use requests in asyncio?
This is pretty much exactly my example: Combining requests
and asyncio
. Which brings me to approach 3:
Approach 3:
Same structure as approach 2, but wait on each task that was given to run_in_executor()
individually (surely this counts as awaiting the coroutine):
# approach 3:
# wrapping executor in coroutine
# awaiting every task individually
async def main():
tasks = []
for i in range(10):
task = loop.run_in_executor(None, request_async)
tasks.append(task)
responses = []
for task in tasks:
response = await task
responses.append(response)
return responses
start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}") # 0.004578
My question is: I want to have blocking code in my coroutines and run them in parallel with an executor. How do I get their results ?