2

I'm trying to create a simple network monitoring app in Python. It should essentially:

  • Run multiple scripts (in this case, bash commands like "ping" and "traceroute") infinitely and simultaneously
  • Yield each line from the output of each subprocess; each line should then be consumed elsewhere in the program and sent to a Kafka topic
  • Do some extra processing on the topic and send the data to InfluxDB (but that's less relevant - I do it with Faust).

What I did:

I tried using an async generator:

async def run(command: str):
    proc = await asyncio.create_subprocess_shell(
        command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    while True:
        line = await proc.stdout.readline()
        if line: yield line

Then consume it elsewhere in the program:

...
async for output_line in run("some_command"):
        # do something with line

This works fine for a single subprocess, however I'm not sure what to do when I need multiple async generators to run in parallel and be consumed in parallel - some like asyncio.gather, maybe, but for async generators.

What do you think would be the best approach to go about doing this? Upon searching I found the aiostream module, which can merge multiple async generators like so. I can then instead yield a tuple with the line and, say, the command I gave, to identify which generator the output line came from.

However, maybe there's a simpler solution, hopefully a native one?

Thanks!

elan2k
  • 21
  • 1

2 Answers2

1

What you are looking for is asyncio.gather, which runs multiple awaitable objects simultaneously.

To use it, I think your first task is to wrap your parsing code into a single function, like:

async def parse(cmd):
    async for output_line in run(cmd):
        # something

Then in another function/context, wrap the parse with gather:

result = await asyncio.gather(
    parse("cmd1"),
    parse("cmd2"),
    parse("cmd3"),
)
  • `gather` is great for awaitables with a single result, but probably not going to work as intended for something like iterables as asked. – Alex Peters Jul 08 '23 at 14:23
0

Given some number of AsyncIterators, you want to be able to consume them from within a single for loop:

async for line in merge_iterators([
    run("some_command"),
    run("another_command"),
    ...
]):
    ...

preferably without relying on a third-party library.

There are some subtleties to consider:

  • What should happen if one of the iterators fails while the others are still going?
    • Should it abort the loop entirely?
    • Should it trigger some separate logic?
    • Could it just be ignored? *
  • What should happen if one of the iterators exhausts before the others?
    • Should everything stop?
    • Should the rest keep going? *
  • Are all of the iterators returning data of the same type?
    • If so, no further consideration needed. *
    • If not, how should this be managed, and should it really be one loop, or could it be better handled by multiple concurrent ones?

Your use case is well-defined, so I'm confident that you'd accept the choices I've marked with asterisks (*).

A "native" Python solution to this might look like:

from asyncio import FIRST_COMPLETED, Task, create_task, wait
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar


_T = TypeVar("_T")


async def merge_iterators(iterators: Collection[AsyncIterator[_T]]) -> AsyncIterable[_T]:
    """
    Enable consumption of multiple `AsyncIterator`s from within one `for` loop.

    - Ignore any exceptions.
    - Yield until all iterators have exhausted.

    https://stackoverflow.com/q/72445371/4877269
    """

    # Start by obtaining a task for each iterator's next result.
    # Unfortunately, `create_task` doesn't accept pure awaitables.
    # We need something to turn an awaitable into a coroutine...
    async def await_next(iterator: AsyncIterator[_T]) -> _T:
        """Turn an awaitable into a coroutine for `create_task`."""
        return await iterator.__anext__()

    # ...which can then be turned into a task.
    def as_task(iterator: AsyncIterator[_T]) -> Task[_T]:
        return create_task(await_next(iterator))

    # Create a task for each iterator, keyed on the iterator.
    next_tasks = {iterator: as_task(iterator) for iterator in iterators}

    # As iterators are exhausted, they'll be removed from that mapping.
    # Repeat for as long as any are NOT exhausted.
    while next_tasks:
        # Wait until one of the iterators yields (or errors out).
        # This also returns pending tasks, but we've got those in our mapping.
        done, _ = await wait(next_tasks.values(), return_when=FIRST_COMPLETED)

        for task in done:
            # Identify the iterator.
            iterator = next(it for it, t in next_tasks.items() if t == task)

            # Yield the value, or handle the error.
            try:
                yield task.result()
            except StopAsyncIteration:
                # This iterator has exhausted.
                del next_tasks[iterator]
            except Exception:
                # Something else went wrong.
                # For the sake of this example, ignore the error.
                # In real life, that's not good--at least log it or something!
                pass
            else:
                # The iterator hasn't exhausted or errored out.
                # Queue the next inspection.
                next_tasks[iterator] = as_task(iterator)

    # At this point, all iterators are exhausted.

The same code without comments is perhaps a little less intimidating in size:

from asyncio import FIRST_COMPLETED, Task, create_task, wait
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar


_T = TypeVar("_T")


async def _await_next(iterator: AsyncIterator[_T]) -> _T:
    return await iterator.__anext__()


def _as_task(iterator: AsyncIterator[_T]) -> Task[_T]:
    return create_task(_await_next(iterator))


async def merge_iterators(iterators: Collection[AsyncIterator[_T]]) -> AsyncIterable[_T]:
    next_tasks = {iterator: _as_task(iterator) for iterator in iterators}
    while next_tasks:
        done, _ = await wait(next_tasks.values(), return_when=FIRST_COMPLETED)
        for task in done:
            iterator = next(it for it, t in next_tasks.items() if t == task)
            try:
                yield task.result()
            except StopAsyncIteration:
                del next_tasks[iterator]
            except Exception:
                pass
            else:
                next_tasks[iterator] = _as_task(iterator)
Alex Peters
  • 2,601
  • 1
  • 22
  • 29
  • I realise you would prefer this to output a tuple including (say) the command that the output is related to. An more suitable `merge_iterators` function signature for that might look like `async def merge_iterators(iterators: Mapping[str, AsyncIterator[_T]]) -> AsyncIterable[tuple[str, _T]]:`. – Alex Peters Jul 08 '23 at 15:03