as_completed
is specific in that it neither yields futures like asyncio.wait
, nor their results like asyncio.gather
. Instead, it yields coroutines that you need to await (in whatever way you like) to get the results in completion order. It cannot yield the futures you pass to it because at that point it doesn't yet know which of the passed futures will complete next.
You can associate arbitrary data by wrapping the task in another future, whose result is the task object (to which you've attached your data). This is essentially equivalent to what the C# code does, only without the static-typing ceremony. Taking the setup from this answer, a runnable example looks like this:
import asyncio
async def first():
await asyncio.sleep(5)
return 'first'
async def second():
await asyncio.sleep(1)
return 'second'
async def third():
await asyncio.sleep(3)
return 'third'
def ordinary_generator():
loop = asyncio.get_event_loop()
wrappers = []
for idx, coro in enumerate((first(), second(), third())):
task = loop.create_task(coro)
task.idx = idx + 1
# Wrap the task in a future that completes when the
# task does, but whose result is the task object itself.
wrapper = loop.create_future()
task.add_done_callback(wrapper.set_result)
wrappers.append(wrapper)
for x in asyncio.as_completed(wrappers):
# yield completed tasks
yield loop.run_until_complete(x)
for task in ordinary_generator():
print(task.result(), task.idx)
The other option, which I would recommend, is to replace iteration over as_completed
with a loop that calls asyncio.wait(return_when=FIRST_COMPLETED)
. This will also provide futures as they are complete, but without needing additional wrapping, and resulting in slightly more idiomatic asyncio code. We call ensure_future
on each coroutine to convert it to a future, attach data to it, and only then pass it to asyncio.wait()
. Since wait
returns those same futures, the attached data is on them.
def ordinary_generator():
loop = asyncio.get_event_loop()
pending = []
for idx, coro in enumerate((first(), second(), third())):
task = loop.create_task(coro)
task.idx = idx + 1
pending.append(task)
while pending:
done, pending = loop.run_until_complete(asyncio.wait(
pending, return_when=asyncio.FIRST_COMPLETED))
for task in done:
yield task