22

I'm trying to call ~ 300 API calls at the same time, so that I would get the results in a couple of seconds max.

My pseudo-code looks like this:

def function_1():
    colors = ['yellow', 'green', 'blue', + ~300 other ones]
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    res = loop.run_until_complete(get_color_info(colors))

async def get_color_info(colors):
    loop = asyncio.get_event_loop()
    responses = []
    for color in colors:
        print("getting color")
        url = "https://api.com/{}/".format(color)
        data = loop.run_in_executor(None, requests.get, url)
        r = await data
        responses.append(r.json())
    return responses

Doing this I get getting color printed out every second or so and the code takes forever, so I'm pretty sure they don't run simultaneously. What am I doing wrong?

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
Costantin
  • 2,486
  • 6
  • 31
  • 48
  • 2
    The ``await`` keyword literally means to *wait for* a result. Any instructions after it are only executed *after* the result is ready. Doing work concurrently requires running *several* coroutines at once, not having one do several things. – MisterMiyagi Oct 28 '18 at 20:19

2 Answers2

59

aiohttp with Native Coroutines (async/await)

Here is a typical pattern that accomplishes what you're trying to do. (Python 3.7+.)

One major change is that you will need to move from requests, which is built for synchronous IO, to a package such as aiohttp that is built specifically to work with async/await (native coroutines):

import asyncio
import aiohttp  # pip install aiohttp aiodns


async def get(
    session: aiohttp.ClientSession,
    color: str,
    **kwargs
) -> dict:
    url = f"https://api.com/{color}/"
    print(f"Requesting {url}")
    resp = await session.request('GET', url=url, **kwargs)
    # Note that this may raise an exception for non-2xx responses
    # You can either handle that here, or pass the exception through
    data = await resp.json()
    print(f"Received data for {url}")
    return data


async def main(colors, **kwargs):
    # Asynchronous context manager.  Prefer this rather
    # than using a different session for each GET request
    async with aiohttp.ClientSession() as session:
        tasks = []
        for c in colors:
            tasks.append(get(session=session, color=c, **kwargs))
        # asyncio.gather() will wait on the entire task set to be
        # completed.  If you want to process results greedily as they come in,
        # loop over asyncio.as_completed()
        htmls = await asyncio.gather(*tasks, return_exceptions=True)
        return htmls


if __name__ == '__main__':
    colors = ['red', 'blue', 'green']  # ...
    # Either take colors from stdin or make some default here
    asyncio.run(main(colors))  # Python 3.7+

There are two distinct elements to this, one being the asynchronous aspect of the coroutines and one being the concurrency introduced on top of that when you specify a container of tasks (futures):

  • You create one coroutine get that uses await with two awaitables: the first being .request and the second being .json. This is the async aspect. The purpose of awaiting these IO-bound responses is to tell the event loop that other get() calls can take turns running through that same routine.
  • The concurrent aspect is encapsulated in await asyncio.gather(*tasks). This maps the awaitable get() call to each of your colors. The result is an aggregate list of returned values. Note that this wrapper will wait until all of your responses come in and call .json(). If, alternatively, you want to process them greedily as they are ready, you can loop over asyncio.as_completed: each Future object returned represents the earliest result from the set of the remaining awaitables.

Lastly, take note that asyncio.run() is a high-level "porcelain" function introduced in Python 3.7. In earlier versions, you can mimic it (roughly) like:

# The "full" versions makes a new event loop and calls
# loop.shutdown_asyncgens(), see link above
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main(colors))
finally:
    loop.close()

Limiting Requests

There are a number of ways to limit the rate of concurrency. For instance, see asyncio.semaphore in async-await function or large numbers of tasks with limited concurrency.

Brad Solomon
  • 38,521
  • 31
  • 149
  • 235
  • 1
    Good answer. I wish there was a blog post that explained this as succinctly. Introductory texts on the topic are often outdated or downright incorrect. You might want to mention the use of `Semaphore` to limit the number of concurrent requests, which tends to belong to the same pattern. – user4815162342 Oct 27 '18 at 20:58
  • 3
    @user4815162342 Here's an article I wrote recently - appreciate any feedback & corrections. https://realpython.com/async-io-python/ – Brad Solomon Jan 31 '19 at 14:04
  • when i tried this, it till did everything in series (i.e. make call and get response _for that call_ immediately) until i put `asyncio.sleep(0)` after the `get`. is this necessary? – dstandish Dec 10 '20 at 23:37
  • 2
    Great article and answer, @BradSolomon! Have there been any updates since 2019? – Joey Baruch Dec 08 '21 at 19:32
0

This code below can run test() 10 times concurrently and asynchronously printing the numbers from 0 to 99:

import asyncio

async def test():
    for i in range(0, 100):
        print(i)

async def call_tests():
    tasks = []

    for _ in range(0, 10):
        tasks.append(test())

    await asyncio.gather(*tasks)

asyncio.run(call_tests())

And, this code below is the shorthand for loop version of the above code running test() 10 times asynchronously printing the numbers from 0 to 99:

import asyncio

async def test():
    [print(i) for i in range(0, 100)]

async def call_tests():
    tasks = [test() for _ in range(0, 10)]

    await asyncio.gather(*tasks)

asyncio.run(call_tests())

This is the result below:

...
90
91
92
93
94
95
96
97
98
99
Super Kai - Kazuya Ito
  • 22,221
  • 10
  • 124
  • 129