1

I have the following method in my Tornado handler:

  async def get(self):
      url = 'url here'
      try:
          async for batch in downloader.fetch(url):
              self.write(batch)
              await self.flush()
      except Exception as e:
          logger.warning(e)

This is the code for downloader.fetch():

async def fetch(url, **kwargs):
    timeout = kwargs.get('timeout', aiohttp.ClientTimeout(total=12))
    response_validator = kwargs.get('response_validator', json_response_validator)
    extractor = kwargs.get('extractor', json_extractor)
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(url) as resp:
                response_validator(resp)
                async for batch in extractor(resp):
                    yield batch

    except aiohttp.client_exceptions.ClientConnectorError:
        logger.warning("bad request")
        raise
    except asyncio.TimeoutError:
        logger.warning("server timeout")
        raise

I would like yield the "batch" object from multiple downloaders in paralel. I want the first available batch from the first downloader and so on until all downloaders finished. Something like this (this is not working code):

async for batch in [downloader.fetch(url1), downloader.fetch(url2)]:
    ....

Is this possible? How can I modify what I am doing in order to be able to yield from multiple coroutines in parallel?

Liviu
  • 1,023
  • 2
  • 12
  • 33
  • I've asked a similar [question](https://stackoverflow.com/questions/41790750/writing-files-asynchronously), and while the question itself is different, my code showcases parallel IO (via the `aiofiles` module) – Eli Korvigo Jun 21 '18 at 18:15

2 Answers2

1

How can I modify what I am doing in order to be able to yield from multiple coroutines in parallel?

You need a function that merges two async sequences into one, iterating over both in parallel and yielding elements from one or the other, as they become available. While such a function is not included in the current standard library, you can find one in the aiostream package.

You can also write your own merge function, as shown in this answer:

async def merge(*iterables):
    iter_next = {it.__aiter__(): None for it in iterables}
    while iter_next:
        for it, it_next in iter_next.items():
            if it_next is None:
                fut = asyncio.ensure_future(it.__anext__())
                fut._orig_iter = it
                iter_next[it] = fut
        done, _ = await asyncio.wait(iter_next.values(),
                                     return_when=asyncio.FIRST_COMPLETED)
        for fut in done:
            iter_next[fut._orig_iter] = None
            try:
                ret = fut.result()
            except StopAsyncIteration:
                del iter_next[fut._orig_iter]
                continue
            yield ret

Using that function, the loop would look like this:

async for batch in merge(downloader.fetch(url1), downloader.fetch(url2)):
    ....
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • Great! Your example worked and I also tested the aiostream version of merge. Just wanted to add that presently, iterating as you've shown over the aiostream version of merge shows a warning: "aiostream/aiter_utils.py:104: UserWarning: AsyncIteratorContext is iterated outside of its context". If anybody knows how to do this properly with aiostream please comment. – Liviu Jun 22 '18 at 09:35
  • @Liviu I haven't actually tried `aiostream`, but according to [this comment](https://stackoverflow.com/questions/50901182/watch-stdout-and-stderr-of-a-subprocess-simultaneously/50903757#comment88836465_50903757), you need to wrap the loop in an `async with`, presumably giving it the merged stream. In the general case this ensures that the streams are correctly finalized (their `finally` statements executed, if they have them) regardless of whether the top-level stream is fully exhausted. – user4815162342 Jun 22 '18 at 10:26
  • @Liviu Have you tried `async with merge(...) as stream:` and inside it `async for batch in stream: ...`? – user4815162342 Jun 22 '18 at 18:44
  • 1
    Yes and it did work eventually. I had to open the context like so: async with merge(...).stream() as stream: ... – Liviu Jun 24 '18 at 07:16
-1

Edit: As mentioned in the comment, below method does not execute given routines in parallel.

Checkout aitertools library.

import asyncio
import aitertools

async def f1():
    await asyncio.sleep(5)
    yield 1

async def f2():
    await asyncio.sleep(6)
    yield 2

async def iter_funcs():
    async for x in aitertools.chain(f2(), f1()):
        print(x)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(iter_funcs())

It seems that, functions being iterated must be couroutine.

sardok
  • 1,086
  • 1
  • 10
  • 19
  • The OP wants to iterate over the generators _in parallel_, and `chain` will serialize them, like writing two `async for` loops one after the other. When running the code in the answer, 2 is printed before 1, although 1 specifies a shorter sleep. `iter_funcs` takes 11 seconds to run, a sum of the times of the individual loops. If the async iterators were exhausted in parallel, 1 would be printed first, and `iter_funcs` should run for a total of 6 seconds. – user4815162342 Jun 22 '18 at 05:28
  • You are definitely right, thanks for the comment. I'll edit my post. – sardok Jun 22 '18 at 07:49