22

I'm trying to write something as idiomatic as possible to gather results from futures stored in a dict.

Let's imagine I have the following code:

import asyncio

async def sleep(seconds):
    print(f'sleeping for {seconds} seconds')
    await asyncio.sleep(seconds)
    print(f'finished sleeping {seconds} seconds')


async def run():
    tasks = {
        '4': sleep(4),
        '3': sleep(3),
        '2': sleep(2),
        '1': sleep(1),
    }
    print(await gather_from_dict(tasks))


if __name__ == '__main__':
     asyncio.get_event_loop().run_until_complete(run())

The output I'm expecting is:

sleeping for 2 seconds
sleeping for 1 seconds
sleeping for 4 seconds
sleeping for 3 seconds
finished sleeping 1 seconds
finished sleeping 2 seconds
finished sleeping 3 seconds
finished sleeping 4 seconds
{'4': None, '3': None, '2': None, '1': None}

So far the cleanest solution I've found is:

async def gather_from_dict(tasks:Dict[Hashable, Awaitable],
                           loop=None, return_exceptions=False) -> Dict:

    results = await asyncio.gather(
        *tasks.values(),
        loop=loop,
        return_exceptions=return_exceptions
    )
    return dict(zip(tasks.keys(), results))

Any ideas on how to do this in a simpler way? Thanks!!!

jordixou
  • 331
  • 3
  • 8
  • Updated with suggestions from @vaultah – jordixou Jan 08 '18 at 14:15
  • I think your solution is the cleanest so far : ) I think what can possibly be improved is to await the `dict(zip(tasks.keys(), results))` too, since it's a bit time consuming. – Newskooler Apr 01 '20 at 15:14
  • Great solution! BTW: dicts are now ordered in Python, but it can be changed in the future Python releases. So, probably OrderedDict is more robust. – Mark Mishyn Aug 27 '21 at 13:50

2 Answers2

1

I redefined your tasks to make it purer as coroutine lists, and prefer to get the results from the run_until_complete methods, codes like the following, and notice that I return something in your sleep codes, in your codes, you actually return None.

import asyncio


async def sleep(seconds):
    print('sleeping for {seconds} seconds'.format(seconds=seconds))
    await asyncio.sleep(seconds)
    print('finished sleeping {seconds} seconds'.format(seconds=seconds))
    return {seconds: 'value of {seconds}'.format(seconds=seconds)}


if __name__ == '__main__':

    loop = asyncio.get_event_loop()

    tasks = [sleep(i) for i in range(1, 5)]

    finished, _ = loop.run_until_complete(
        asyncio.wait(tasks))

    result = {}

    for task in finished:
        result.update(task.result())

    print(result)
    loop.close()
Menglong Li
  • 2,177
  • 14
  • 19
0

Assumptions: Futures (or coroutines) will be in a structure of lists & dicts and not inside any custom objects or tuples!

Given some async functions like this:

import asyncio


async def slow(result):
    await asyncio.sleep(0.1)
    return result

async def some_complex_stuff_here():
    return {
        'a': slow(1),
        'b': [
            slow('hello'),
            {
                'c': slow('fna'),
                1: slow('world'),
            }
        ],
    }

You could use the following code to await all of the content:

def __gather_futures(container, path, futures):
    if isinstance(container, dict):
        for key, value in container.items():
            sub_path = path + (key, )
            __gather_futures(value, sub_path, futures)

    if isinstance(container, list):
        for idx, value in enumerate(container):
            sub_path = path + (idx,)
            __gather_futures(value, sub_path, futures)

    if inspect.isawaitable(container):
        futures.append((path, container))


async def gather_object_futures(container):
    futures = []
    wrapper = {'content': container}
    __gather_futures(container=wrapper, path=(), futures=futures)
    results = await asyncio.gather(*[f[1] for f in futures])
    for (path, future), result in zip(futures, results):
        current_object = wrapper
        for key in path[:-1]:
            current_object = current_object[key]
        current_object[path[-1]] = result

    return wrapper['content']

To call this you could use:

async def run():
    return await gather_object_futures(await some_complex_stuff_here())

import time
time_start = time.time()
loop = asyncio.get_event_loop()
result = loop.run_until_complete(run())

# will print something like 100ms --> all futures have been gathred at once!
print(time.time() - time_start)  

# result contains all of the resolved results
print(result

Please note: the await inside the function call of gather_object_futures is crucial!!

Nils Ziehn
  • 4,118
  • 6
  • 26
  • 40