6

I'm trying to merge a bunch of asynchronous generators in Python 3.7 while still adding new async generators on iteration. I'm currently using aiostream to merge my generators:

from asyncio import sleep, run
from aiostream.stream import merge

async def go():
    yield 0
    await sleep(1)
    yield 50
    await sleep(1)
    yield 100

async def main():
    tasks = merge(go(), go(), go())

    async for v in tasks:
        print(v)

if __name__ == '__main__':
    run(main())

However, I need to be able to continue to add to the running tasks once the loop has begun. Something like.

from asyncio import sleep, run
from aiostream.stream import merge

async def go():
    yield 0
    await sleep(1)
    yield 50
    await sleep(1)
    yield 100

async def main():
    tasks = merge(go(), go(), go())

    async for v in tasks:
        if v == 50:
            tasks.merge(go())
        print(v)

if __name__ == '__main__':
    run(main())

The closest I've got to this is using the aiostream library but maybe this can also be written fairly neatly with just the native asyncio standard library.

freebie
  • 2,161
  • 2
  • 19
  • 36
  • If you are already using `aiostream` to merge the generators, and you have managed to add to running tasks, then why not retain that solution? Merging a _fixed_ number of tasks is possible, but [not trivial](https://stackoverflow.com/a/50975690/1600898) using stock asyncio. You can use that implementation as a starting point if you need to add support for additional iterators. – user4815162342 Jul 18 '18 at 11:34
  • @user4815162342 I'm sorry, I don't understand what you mean. If you are asking why don't I just append the extra go calls before I start iterating then my answer would be: this is a contrived example simplifying my use-case. I do not know all the tasks to be performed when I start the loop they are determined by the results of other tasks. Is that what you are asking? – freebie Jul 18 '18 at 11:41
  • Based on the last sentence of your question ("The closest I've got to this is using the `aiostream` library"), I got the impression that you solved the problem using `aiostreams`, so I was wondering why you're not satisfied with that solution? – user4815162342 Jul 18 '18 at 11:50
  • Nope sorry, I was saying that the closest I got to getting the functionality that I want (merging multiple async generators) was using that library but it doesn't support adding more tasks later. So not full satisfied the requirement. I wanted to mention that I realise aiostream might not be the right tool for the job, but it at least gives me the syntax to express the problem here. – freebie Jul 18 '18 at 11:54

2 Answers2

4

Here is an implementation that should work efficiently even with a large number of async iterators:

class merge:
    def __init__(self, *iterables):
        self._iterables = list(iterables)
        self._wakeup = asyncio.Event()

    def _add_iters(self, next_futs, on_done):
        for it in self._iterables:
            it = it.__aiter__()
            nfut = asyncio.ensure_future(it.__anext__())
            nfut.add_done_callback(on_done)
            next_futs[nfut] = it
        del self._iterables[:]
        return next_futs

    async def __aiter__(self):
        done = {}
        next_futs = {}
        def on_done(nfut):
            done[nfut] = next_futs.pop(nfut)
            self._wakeup.set()

        self._add_iters(next_futs, on_done)
        try:
            while next_futs:
                await self._wakeup.wait()
                self._wakeup.clear()
                for nfut, it in done.items():
                    try:
                        ret = nfut.result()
                    except StopAsyncIteration:
                        continue
                    self._iterables.append(it)
                    yield ret
                done.clear()
                if self._iterables:
                    self._add_iters(next_futs, on_done)
        finally:
            # if the generator exits with an exception, or if the caller stops
            # iterating, make sure our callbacks are removed
            for nfut in next_futs:
                nfut.remove_done_callback(on_done)

    def append_iter(self, new_iter):
        self._iterables.append(new_iter)
        self._wakeup.set()

The only change required for your sample code is that the method is named append_iter, not merge.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
2

This can be done using stream.flatten with an asyncio queue to store the new generators.

import asyncio
from aiostream import stream, pipe

async def main():
    queue = asyncio.Queue()
    await queue.put(go())
    await queue.put(go())
    await queue.put(go())

    xs = stream.call(queue.get)
    ys = stream.cycle(xs)
    zs = stream.flatten(ys, task_limit=5)
    async with zs.stream() as streamer:
        async for item in streamer:
            if item == 50:
                await queue.put(go())
            print(item)

Notice that you may tune the number of tasks that can run at the same time using the task_limit argument. Also note that zs can be elegantly defined using the pipe syntax:

zs = stream.call(queue.get) | pipe.cycle() | pipe.flatten(task_limit=5)

Disclaimer: I am the project maintainer.

Vincent
  • 12,919
  • 1
  • 42
  • 64