3

Question: After asyncio.as_completed yields a result, how do you get a reference to the original task?

Basically the same as this C# question, except in Python: Getting reference to original Task after ordering Tasks by completion?

Example Problem:

# Takes a list of WebClient objects,
# calls each one simultaneously,
# and yields the results immediately as they arrive
# to a synchronous caller.

def yieldThingsAsTheyArrive(webClients):

    tasks = []
    for webClient in webClients:
        # This is what we want to get a reference to later:
        task = webClient.fetch_thing()  # start long-running request asynchronously
        tasks.append(task)

    loop = asyncio.get_event_loop()
    for future in asyncio.as_completed(tasks):
        thing = loop.run_until_complete(future)  # since our caller is synchronous, wait until the task completes so we can yield the final result instead of a future
        thing.originalWebClient = ???  # This is where we need a reference to the original webClient
        yield thing
drifter
  • 611
  • 5
  • 17
  • I've retracted my answer to study a bit better what happens with `as_completed`. What I don't understand is how your snippet works "from a synchronous caller". For the tasks to complete, somebody needs to run the event loop. Who does that, and where? – user4815162342 Apr 26 '18 at 12:11
  • @user4815162342 - Ah, so far, I've only run this code from unit tests. I assume the test runner is initializing the event loop. It sounds like that's going to be an issue when I try to run it in production. Since this method is called directly from the main function, I suppose I could start a loop in the main module. I'm not sure what best practice is. The app is mostly synchronous, so I don't want async code to creep into everything if I can avoid it. – drifter Apr 26 '18 at 17:09
  • @user4815162342 - I meant to link you to the SO answer that recommended the 'run_until_complete' call in order to process 'as_completed' synchronously. It's the second code snippet here: https://stackoverflow.com/a/41901796/404566 – drifter Apr 26 '18 at 17:24
  • 1
    This is a very interesting use of `as_completed`. It appears that, contrary to its name, `as_completed` doesn't actually yields things "as they are completed" (a `print` before the `yield`, as well as a careful examination of the code, will show that it yields its first element immediately). It is a non-blocking call that yields coroutines that you can yourself await (in whatever way you like) to get the results in completion order. This is why it cannot yield the original future, it doesn't yet *know* the result from which future it will actually obtain. – user4815162342 Apr 27 '18 at 05:28
  • 1
    Possible solutions are to either implement something equivalent to the linked C# code, or (and I'd prefer that one) to replace the use of `as_completed` with a custom generator that calls `run_until_complete(asyncio.wait(remaining_futures))`. I'll write up an answer when I get some time. – user4815162342 Apr 27 '18 at 05:29

1 Answers1

6

as_completed is specific in that it neither yields futures like asyncio.wait, nor their results like asyncio.gather. Instead, it yields coroutines that you need to await (in whatever way you like) to get the results in completion order. It cannot yield the futures you pass to it because at that point it doesn't yet know which of the passed futures will complete next.

You can associate arbitrary data by wrapping the task in another future, whose result is the task object (to which you've attached your data). This is essentially equivalent to what the C# code does, only without the static-typing ceremony. Taking the setup from this answer, a runnable example looks like this:

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

def ordinary_generator():
    loop = asyncio.get_event_loop()

    wrappers = []
    for idx, coro in enumerate((first(), second(), third())):
        task = loop.create_task(coro)
        task.idx = idx + 1
        # Wrap the task in a future that completes when the 
        # task does, but whose result is the task object itself.
        wrapper = loop.create_future()
        task.add_done_callback(wrapper.set_result)
        wrappers.append(wrapper)

    for x in asyncio.as_completed(wrappers):
        # yield completed tasks
        yield loop.run_until_complete(x)

for task in ordinary_generator():
    print(task.result(), task.idx)

The other option, which I would recommend, is to replace iteration over as_completed with a loop that calls asyncio.wait(return_when=FIRST_COMPLETED). This will also provide futures as they are complete, but without needing additional wrapping, and resulting in slightly more idiomatic asyncio code. We call ensure_future on each coroutine to convert it to a future, attach data to it, and only then pass it to asyncio.wait(). Since wait returns those same futures, the attached data is on them.

def ordinary_generator():
    loop = asyncio.get_event_loop()

    pending = []
    for idx, coro in enumerate((first(), second(), third())):
        task = loop.create_task(coro)
        task.idx = idx + 1
        pending.append(task)

    while pending:
        done, pending = loop.run_until_complete(asyncio.wait(
            pending, return_when=asyncio.FIRST_COMPLETED))
        for task in done:
            yield task
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • 1
    Brilliant! It works perfectly. I deleted my first comment so not to mislead anyone just scanning as to whether this current solution works. – drifter Apr 27 '18 at 23:22