5

I have a simple class that leverages an async generator to retrieve a list of URLs:

import aiohttp
import asyncio
import logging
import sys

LOOP = asyncio.get_event_loop()
N_SEMAPHORE = 3

FORMAT = '[%(asctime)s] - %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)

class ASYNC_GENERATOR(object):
    def __init__(self, n_semaphore=N_SEMAPHORE, loop=LOOP):
        self.loop = loop
        self.semaphore = asyncio.Semaphore(n_semaphore)
        self.session = aiohttp.ClientSession(loop=self.loop)

    async def _get_url(self, url):
        """
        Sends an http GET request to an API endpoint
        """

        async with self.semaphore:
            async with self.session.get(url) as response:
                logger.info(f'Request URL: {url} [{response.status}]')
                read_response = await response.read()

                return {
                    'read': read_response,
                    'status': response.status,
                }

    def get_routes(self, urls):
        """
        Wrapper around _get_url (multiple urls asynchronously)

        This returns an async generator
        """

        # Asynchronous http GET requests
        coros = [self._get_url(url) for url in urls]
        futures = asyncio.as_completed(coros)
        for future in futures:
            yield self.loop.run_until_complete(future)

    def close(self):
        self.session._connector.close()

When I execute this main part of the code:

if __name__ == '__main__':
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    for response in responses:
        response = next(ag.get_routes(['https://httpbin.org/get']))
    ag.close()

The log prints out:

[2018-05-15 12:59:49,228] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 12:59:49,235] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 12:59:49,242] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 12:59:49,285] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 12:59:49,290] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 12:59:49,295] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 12:59:49,335] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 12:59:49,340] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 12:59:49,347] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 12:59:49,388] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 12:59:49,394] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,444] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,503] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,553] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,603] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,650] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,700] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,825] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,875] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,922] - Request URL: https://httpbin.org/get [200]

Since responses is an async generator, I expect it to yield one response from the async generator (which should only send the request upon actually yielding), send a separate request to the endpoint with no x parameter, and then yield the next response from the async generator. This should flip back and forth between a request with an x parameter and a request with no parameters. Instead, it is yielding all responses from the async generator with an x parameter and then followed by all of the https requests that have no parameters.

Something similar happens when I do:

ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
next(responses)
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()

And the log prints:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,656] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 13:08:38,681] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 13:08:38,695] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 13:08:38,717] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 13:08:38,741] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 13:08:38,750] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 13:08:38,773] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 13:08:38,792] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 13:08:38,803] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

Instead, what I want is:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

There are times when I want to retrieve all of the responses first before doing anything else. However, there are also times when I want to interject and make intermediate requests before yielding the next item from the generator (i.e., the generator returns results from paginated search results and I want to process further links from each page before moving onto the next page).

What do I need to change to achieve the required result?

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
slaw
  • 6,591
  • 16
  • 56
  • 109
  • `responses` is not an async generator; it's a regular, normal, synchronous generator function. To be async, you'd have to use `async def responses(...):` – Martijn Pieters May 15 '18 at 17:18
  • I don't think I follow. Would you mind providing a code snippet? Would `async def responses(...):` sit between `get_routes` and `_get_url`? Or do I use this inside `__main__`? And what would I be awaiting inside `async def responses(...):`? Thank you for your patience as I am still learning this complicated async stuff! – slaw May 15 '18 at 17:44
  • @MartijnPieters The trouble is that the OP is using `as_completed`, which is itself an ordinary generator, but designed (strangely) for use with asyncio. Thus it comes naturally to wrap it using another ordinary generator. Using `run_until_complete` on coroutines yielded (synchronously) by `as_completed` is a [legitimate](https://stackoverflow.com/questions/41901795/create-generator-that-yields-coroutine-results-as-the-coroutines-finish/41901796#41901796) way of using it, though not something I'd recommend. – user4815162342 May 15 '18 at 18:07
  • @user4815162342 I am flexible and willing to learn the best way to go about this if you don't mind offering an answer with the right solution? I cobbled this together using limited knowledge scraped from disparate examples from the web so I'd appreciate any help that I can get. Hopefully, it is clear what I am trying to achieve? – slaw May 15 '18 at 18:21
  • Of course, it just took some time to write it. :) I wanted to clarify to Martijn that converting your code to an async generator is not so easy as long as you are bogged down by `as_completed`, which IMHO should never been ported to asyncio - its interface makes sense for `concurrent.futures`, but not here. – user4815162342 May 15 '18 at 18:28
  • 1
    @slaw: sorry, that was too brief a comment and I had to step away. The term *async generator* has a [very specific meaning in Python](https://www.python.org/dev/peps/pep-0525/); your generator function pushes tasks into the asyncio loop but is not an async generator. You are firing off a series of tasks and then waiting for the next one to be complete; all those tasks are executing cooperatively. – Martijn Pieters May 15 '18 at 19:21
  • @slaw: user4815162342 has you covered as far as your use-case is concerned. – Martijn Pieters May 15 '18 at 19:22

1 Answers1

7

Leaving aside the technical question of whether responses is an async generator (it's not, as Python uses the term), your problem lies in as_completed. as_completed starts a bunch of coroutines in parallel and provides means to obtain their results as they complete. That the futures run in parallel is not exactly obvious from the documentation (improved in later versions), but it makes sense if you consider that the original concurrent.futures.as_completed works on thread-based futures which revolve around parallel execution. Conceptually, the same is true of asyncio futures.

Your code obtains only the first (fastest-arriving) result and then start doing something else, also using asyncio. The remaining coroutines passed to as_completed are not frozen up merely because no one is collecting their results - they are doing their jobs in the background, and once done are ready to be awaited (in your case by the code inside as_completed, which you access using loop.run_until_complete()). I would venture to guess that the URL without parameters takes longer to retrieve than the URL with just the parameter x, which is why it gets printed after all other coroutines.

In other words, those log lines being printed means that asyncio is doing its job and providing the parallel execution you requested! If you don't want parallel execution, then don't ask for it, execute them serially:

def get_routes(self, urls):
    for url in urls:
        yield loop.run_until_complete(self._get_url(url))

But this is a poor way of using asyncio - its main loop is non-reentrant, so to ensure composability, you almost certainly want the loop to be spun just once once at top-level. This is typically done with a construct like loop.run_until_complete(main()) or loop.run_forever(). As Martijn pointed out, you could achieve that, while retaining the nice generator API, by making get_routes an actual async generator:

async def get_routes(self, urls):
    for url in urls:
        result = await self._get_url(url)
        yield result

Now you can have a main() coroutine that looks like this:

async def main():
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    async for response in responses:
        # simulate `next` with async iteration
        async for other_response in ag.get_routes(['https://httpbin.org/get']):
            break
    ag.close()

loop.run_until_complete(main())
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • The `get_routes()` async generator could also add the plain non-query-string URL when the `await self._get_url(url)` call returns a result, perhaps after the `yield result`? That way at least *those* requests would be run in parallel to the synchronous processing of the `urls` list. – Martijn Pieters May 15 '18 at 19:25
  • @MartijnPieters You mean create the task _before_ `yield result` (which is safe because at that point we know the URL is done and the order respected), but actually await (and yield) it afterwards. That's very clever (unless the OP wanted them not to overlap at all), but it breaks the `get_routes` abstraction, and might be a bit too advanced for an asyncio beginner. – user4815162342 May 15 '18 at 19:50
  • Sure, I'd at least change the name of the function, I'm thinking of a general refactor to get closer to what the OP was trying to do. – Martijn Pieters May 15 '18 at 19:51
  • Is there a reason why loop.run_until_complete can't be placed within a function? From an abstraction standpoint, I don't want the user to have to know anything about async or event loops. I've seen the main() function being used in many online examples but now the user needs to know to call the class, throw it inside a main() func, and pass it to the event loop when all the want is paginated responses. Also, as an async noob, I'd be happy to study the more clever code if you wanted to include at the bottom as an update? I'll play with this more but, nonetheless, thank you for your help! – slaw May 15 '18 at 21:08
  • @slaw Sure, If you really don't want the caller to know anything about asyncio, you can place `run_until_complete` in a function. I was just pointing out that it's a suboptimal use of asyncio. Normally asyncio allows one to compose different async code by starting multiple coroutines "in parallel" or integrating them with callback-based futures. You can even have coroutines that don't know of each other, all running in the same event loop. By hiding the use of asyncio inside a function, you have a design that disables that. – user4815162342 May 16 '18 at 05:49
  • @slaw Just to clarify, I'm not saying that your `main` (or equivalent) function should necessarily be async, just that you should expose both an async API and (if needed) a synchronous function that trivially calls `asyncio.get_event_loop().run_until_complete(async_entry_point())`. Having synchronous code that knows about individual lower-level coroutines, and runs `run_until_complete()` in a loop, just seems wrong, design-wise. – user4815162342 May 16 '18 at 05:52
  • The "clever" refactoring that allows additional parallelism suggested by @MartijnPieters only works if you use an async generator, i.e. if you remain inside the event loop while spending the generator. It won't work if you're calling `run_until_complete` in a loop. Plus, the answer is quite long as it is - that optimization is apt material for a different question. – user4815162342 May 16 '18 at 05:57
  • @user4815162342 Thanks for the additional info. Correct me if I'm wrong but it seems that one must know and collect all of the coroutines before calling `run_until_complete`. Is there a way to have the loop constantly "listening" and asynchronously processing whatever shows up in the supposed queue (the queue could be empty, with one coroutine, more than three would be limited by the semaphore). So, I can interactively add coroutines to the queue on-the-fly? Hopefully, I'm making sense. I think I'm starting to better understand how all of this works! – slaw May 16 '18 at 17:38
  • @slaw Yes, definitely! You can start a background thread, create an event loop and run it using `loop.run_forever()` there. [`asyncio.run_coroutine_threadsafe`](https://docs.python.org/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe) will submit coroutines to the loop from whichever thread you like, and return a `concurrent.futures.Future` which you can use to query (or wait for) the result. – user4815162342 May 16 '18 at 17:56
  • After setting up a separate thread with its own loop and then I run `asyncio.run_coroutine_threadsafe(ag.get_routes(urls), new_loop)` it get `TypeError: A coroutine object is required`. I can do `asyncio.run_coroutine_threadsafe(ag._get_url(urls[0]), new_loop)` (a single request with `_get_url`) and then I am able to access the future.result(). How do I do this with the async generator or is it even possible since `run_coroutine_threadsafe` appears to need a coroutine as input. I tried `async response from responses:` without defining a function wrapper but that is bad syntax. – slaw May 22 '18 at 16:44
  • @slaw I'm not sure I understand the new requirement. Your last comment was about the event loop listening for a request and processing them queue, and that is what `run_coroutine_threadsafe(ag._get_url(...))` does. Why would you even need an asynchronous generator in this design? (Sure, it's possible to pass multiple/streaming results between threads using `queue.Queue` or similar. But going into details of that approaches the "xy problem" where I'm helping you with what you're literally asking for, while losing sight of your actual requirements. :/ Perhaps it's time for a separate question.) – user4815162342 May 22 '18 at 20:03