14

I'm trying to rewrite this Python2.7 code to the new async world order:

def get_api_results(func, iterable):
    pool = multiprocessing.Pool(5)
    for res in pool.map(func, iterable):
        yield res

map() blocks until all results have been computed, so I'm trying to rewrite this as an async implementation that will yield results as soon as they are ready. Like map(), return values must be returned in the same order as iterable. I tried this (I need requests because of legacy auth requirements):

import requests

def get(i):
    r = requests.get('https://example.com/api/items/%s' % i)
    return i, r.json()

async def get_api_results():
    loop = asyncio.get_event_loop()
    futures = []
    for n in range(1, 11):
        futures.append(loop.run_in_executor(None, get, n))
    async for f in futures:
        k, v = await f
        yield k, v

for r in get_api_results():
    print(r)

but with Python 3.6 I'm getting:

  File "scratch.py", line 16, in <module>
    for r in get_api_results():
TypeError: 'async_generator' object is not iterable

How can I accomplish this?

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
Erik Cederstrand
  • 9,643
  • 8
  • 39
  • 63
  • 1
    Don't put the event loop in an async code block, async code must be run by the event loop, not the other way around. – Martijn Pieters Dec 28 '16 at 09:56
  • Thanks! Surely, I'm missing something here. All event loop examples I have seen use loop.run_until_complete(get_api_results()) which in my understanding would both make the call blocking and lose the results. – Erik Cederstrand Dec 28 '16 at 10:01
  • You normally would have more coroutines handling the results, with the event loop driving those. – Martijn Pieters Dec 28 '16 at 10:02
  • also, `requests.get()` is a blocking call, not something you can await on. – Martijn Pieters Dec 28 '16 at 10:06
  • Yes, that's why I wrapped it in `loop.run_in_executor()` as suggested in http://stackoverflow.com/questions/22190403/how-could-i-use-requests-in-asyncio – Erik Cederstrand Dec 28 '16 at 10:10
  • Right, but the loop is not *driven* there, the `loop.run_until_complete()` call sits firmly outside of the async code path. I'd make `main()` take a `loop` argument to pass from `loop.run_until_complete(main(loop))`, to be honest. – Martijn Pieters Dec 28 '16 at 10:12
  • -1 for the standard Stack Overflow failure mode of having a highly generic "how to" title on what's in fact a debugging question. This is currently my fourth-highest Google result for *`async generator python`*, but almost useless as a resource for someone who wants to know the answer to the question in the title! A less falsely-generic title would be a significant improvement. – Mark Amery Jan 30 '17 at 17:53

2 Answers2

13

Regarding your older (2.7) code - multiprocessing is considered a powerful drop-in replacement for the much simpler threading module for concurrently processing CPU intensive tasks, where threading does not work so well. Your code is probably not CPU bound - since it just needs to make HTTP requests - and threading might have been enough for solving your problem.

However, instead of using threading directly, Python 3+ has a nice module called concurrent.futures that with a cleaner API via cool Executor classes. This module is available also for python 2.7 as an external package.

The following code works on python 2 and python 3:

# For python 2, first run:
#
#    pip install futures
#
from __future__ import print_function

import requests
from concurrent import futures

URLS = [
    'http://httpbin.org/delay/1',
    'http://httpbin.org/delay/3',
    'http://httpbin.org/delay/6',
    'http://www.foxnews.com/',
    'http://www.cnn.com/',
    'http://europe.wsj.com/',
    'http://www.bbc.co.uk/',
    'http://some-made-up-domain.coooom/',
]


def fetch(url):
    r = requests.get(url)
    r.raise_for_status()
    return r.content


def fetch_all(urls):
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = {executor.submit(fetch, url): url for url in urls}
        print("All URLs submitted.")
        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            if future.exception() is None:
                yield url, future.result()
            else:
                # print('%r generated an exception: %s' % (
                # url, future.exception()))
                yield url, None


for url, s in fetch_all(URLS):
    status = "{:,.0f} bytes".format(len(s)) if s is not None else "Failed"
    print('{}: {}'.format(url, status))

This code uses futures.ThreadPoolExecutor, based on threading. A lot of the magic is in as_completed() used here.

Your python 3.6 code above, uses run_in_executor() which creates a futures.ProcessPoolExecutor(), and does not really use asynchronous IO!!

If you really want to go forward with asyncio, you will need to use an HTTP client that supports asyncio, such as aiohttp. Here is an example code:

import asyncio

import aiohttp


async def fetch(session, url):
    print("Getting {}...".format(url))
    async with session.get(url) as resp:
        text = await resp.text()
    return "{}: Got {} bytes".format(url, len(text))


async def fetch_all():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, "http://httpbin.org/delay/{}".format(delay))
                 for delay in (1, 1, 2, 3, 3)]
        for task in asyncio.as_completed(tasks):
            print(await task)
    return "Done."


loop = asyncio.get_event_loop()
resp = loop.run_until_complete(fetch_all())
print(resp)
loop.close()

As you can see, asyncio also has an as_completed(), now using real asynchronous IO, utilizing only one thread on one process.

Udi
  • 29,222
  • 9
  • 96
  • 129
  • 1
    `Since coroutines are generators, it is not possible to use simple "yield"s in them.` It's possible. https://stackoverflow.com/a/37550568/2908138 – im7mortal May 24 '17 at 14:08
  • @im7mortal: thank you, I have removed this part from the answer. – Udi May 25 '17 at 11:15
8

You put your event loop in another co-routine. Don't do that. The event loop is the outermost 'driver' of async code, and should be run synchronous.

If you need to process the fetched results, write more coroutines that do so. They could take the data from a queue, or could be driving the fetching directly.

You could have a main function that fetches and processes results, for example:

async def main(loop): 
    for n in range(1, 11):
        future = loop.run_in_executor(None, get, n)
        k, v = await future
        # do something with the result

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

I'd make the get() function properly async too using an async library like aiohttp so you don't have to use the executor at all.

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343