291

asyncio.gather and asyncio.wait seem to have similar uses: I have a bunch of async things that I want to execute/wait for (not necessarily waiting for one to finish before the next one starts).

Since Python 3.11 there is yet another similar feature, asyncio.TaskGroup.

They use a different syntax, and differ in some details, but it seems very un-pythonic to me to have several functions that have such a huge overlap in functionality.

What am I missing?

mkrieger1
  • 19,194
  • 5
  • 54
  • 65
Claude
  • 8,806
  • 4
  • 41
  • 56

6 Answers6

341

Although similar in general cases ("run and get results for many tasks"), each function has some specific functionality for other cases (and see also TaskGroup for Python 3.11+ below):

asyncio.gather()

Returns a Future instance, allowing high level grouping of tasks:

import asyncio
from pprint import pprint

import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(1, 3))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])

all_groups = asyncio.gather(group1, group2, group3)

results = loop.run_until_complete(all_groups)

loop.close()

pprint(results)

All tasks in a group can be cancelled by calling group2.cancel() or even all_groups.cancel(). See also .gather(..., return_exceptions=True),

asyncio.wait()

Supports waiting to be stopped after the first task is done, or after a specified timeout, allowing lower level precision of operations:

import asyncio
import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(0.5, 5))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

print("Get first result:")
finished, unfinished = loop.run_until_complete(
    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

for task in finished:
    print(task.result())
print("unfinished:", len(unfinished))

print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
    asyncio.wait(unfinished, timeout=2))

for task in finished2:
    print(task.result())
print("unfinished2:", len(unfinished2))

print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))

for task in finished3:
    print(task.result())

loop.close()

TaskGroup (Python 3.11+)

Update: Python 3.11 introduces TaskGroups which can "automatically" await more than one task without gather() or await():

# Python 3.11+ ONLY!
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print("Both tasks have completed now.")
Udi
  • 29,222
  • 9
  • 96
  • 129
  • 18
    "The single asterisk form ( *args ) is used to pass a non-keyworded, variable-length argument list, and the double asterisk form is used to pass a keyworded, variable-length argument list" – laycat Jan 20 '18 at 05:25
  • 3
    In the `asyncio.gather()` code, If the code that creates those three groups is contained within a function body, you can get rid of the `loop = asyncio.get_event_loop()` and refactor the code adding an `await` to the `asyncio.gather(group1, group2, group3)` making it slightly simpler, and all the lines related with the loop variables will no longer be needed – Yassine Nacif Apr 22 '22 at 15:10
  • Dear Udi, in your *asyncio.gather* example, whenever I run in my computer, I get the "RuntimeError: This event loop is already running", even though the results are properly evaluated. How can one solve this RuntimeError? Shouldn't we use the loop object? – Philipe Riskalla Leal Sep 28 '22 at 00:15
  • @PhilipeRiskallaLeal: copy the text into a `demo.py` file and execute it from the command line using `python demo.py` – Udi Oct 02 '22 at 14:41
  • `run_until_complete` causes issues when there already is an async loop running. What alternative is there to await for the tasks to finish **synchronously**? – theberzi Feb 01 '23 at 13:47
86

A very important distinction, which is easy to miss, is the default behavior of these two functions, when it comes to exceptions.


I'll use this example to simulate a coroutine that will raise exceptions, sometimes -

import asyncio
import random


async def a_flaky_tsk(i):
    await asyncio.sleep(i)  # bit of fuzz to simulate a real-world example

    if i % 2 == 0:
        print(i, "ok")
    else:
        print(i, "crashed!")
        raise ValueError

coros = [a_flaky_tsk(i) for i in range(10)]

await asyncio.gather(*coros) outputs -

0 ok
1 crashed!
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 20, in <module>
    asyncio.run(main())
  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 17, in main
    await asyncio.gather(*coros)
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError

As you can see, the coros after index 1 never got to execute. Future returned by gather() is done at that point (unlike wait()) and program terminates, but if you could keep the program alive, other coroutines still would have chance to run:

async def main():
    coros = [a_flaky_tsk(i) for i in range(10)]
    await asyncio.gather(*coros)
    

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.create_task(main())
    loop.run_forever()

# 0 ok
# 1 crashed!
# Task exception was never retrieved
#  ....
# 2 ok
# 3 crashed!
# 4 ok
# 5 crashed!
# 6 ok
# 7 crashed!
# 8 ok
# 9 crashed!


But await asyncio.wait(coros) continues to execute tasks, even if some of them fail (Future returned by wait() is not done, unlike gather()) -

0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
Task exception was never retrieved
future: <Task finished name='Task-10' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-8' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-9' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError

Of course, this behavior can be changed for both by using -

asyncio.gather(..., return_exceptions=True)

or,

asyncio.wait([...], return_when=asyncio.FIRST_EXCEPTION)


But it doesn't end here!

Notice: Task exception was never retrieved in the logs above.

asyncio.wait() won't re-raise exceptions from the child tasks until you await them individually. (The stacktrace in the logs are just messages, they cannot be caught!)

done, pending = await asyncio.wait(coros)
for tsk in done:
    try:
        await tsk
    except Exception as e:
        print("I caught:", repr(e))

Output -

0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()

On the other hand, to catch exceptions with asyncio.gather(), you must -

results = await asyncio.gather(*coros, return_exceptions=True)
for result_or_exc in results:
    if isinstance(result_or_exc, Exception):
        print("I caught:", repr(result_or_exc))

(Same output as before)

Jemshit
  • 9,501
  • 5
  • 69
  • 106
Dev Aggarwal
  • 7,627
  • 3
  • 38
  • 50
  • 13
    I never understood `Task exception was never retrieved` error until I came across this post. Thanks a lot for great explanation.. – Saurav Kumar Jun 18 '21 at 15:26
  • 2
    @SauravKumar me too! Heck, this is so helpful!! – pepoluan Jan 04 '22 at 14:48
  • To help someone to understand. `Task exception was never retrieved` is shown where there are no references left to the task object (right before destroying). Python notifies you about exception in the task because you will never be able to gain acces to it later. – Den Avrondo Jun 14 '23 at 15:20
79

asyncio.wait is more low level than asyncio.gather.

As the name suggests, asyncio.gather mainly focuses on gathering the results. It waits on a bunch of futures and returns their results in a given order.

asyncio.wait just waits on the futures. And instead of giving you the results directly, it gives done and pending tasks. You have to manually collect the values.

Moreover, you could specify to wait for all futures to finish or just the first one with wait.

E. Körner
  • 122
  • 3
  • 9
ospider
  • 9,334
  • 3
  • 46
  • 46
  • 9
    @Kingname ..wat – Matt Joiner Jul 23 '20 at 01:02
  • 2
    do you mean that asyncio.gather will have to wait for all of them to complete, while asyncio.wait will return to you the current status of each one (pending or not)? Reading your answer is not clear to me – EigenFool May 12 '21 at 07:30
  • 2
    @EigenFool As of Python 3.9, `asyncio.wait` has a parameter called `return_when`, which you can use to control when the event loop should yield back to you. `asyncio.gather` does not have such parameter, the event loop only get back to you when all tasks have finished/failed. Read the official docs here: https://docs.python.org/3/library/asyncio-task.html#asyncio.wait – ospider May 12 '21 at 08:18
  • 3
    @ospider The parameter called `return_when` for `asyncio.wait` is already available in Python 3.5.9! See here: https://docs.python.org/3.5/library/asyncio-task.html#asyncio.wait – e.d.n.a May 27 '21 at 18:28
  • 2
    @Kingname `python -m timeit "print('hello')"` gives 36.6 usec per loop, so 10000000000000 `print('hello')` will take 11.6 years to complete for just `print()` function – Karol Zlot Jun 22 '21 at 02:46
23

I also noticed that you can provide a group of coroutines in wait() by simply specifying the list:

result=loop.run_until_complete(asyncio.wait([
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ]))

Whereas grouping in gather() is done by just specifying multiple coroutines:

result=loop.run_until_complete(asyncio.gather(
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ))
Johny Ebanat
  • 363
  • 2
  • 6
  • 36
    Lists can also be used with `gather()`, e.g.: `asyncio.gather(*task_list)` – tehfink Mar 12 '18 at 15:15
  • 5
    So can generators – Jab Feb 21 '19 at 01:08
  • 2
    How can you use this gather without blocking the rest of the script? – thebeancounter Mar 18 '20 at 13:50
  • 2
    Awesome. Thanks for the dramatically easier to read example. – Yablargo Apr 28 '21 at 04:51
  • 5
    @thebeancounter You don't need to `await` right-away! `group = asyncio.gather(*aws)` **returns an awaitable/future for the group directly**, which represents all the combined tasks. The tasks can run soon after the `asyncio.gather`-call, e.g. when there is an `await` for something else (like `asyncio.sleep`) or when accessing the future (like `group.done()`). You only need to use `await group`, when you want to make sure the tasks are done or cancelled and to collect all the results. – e.d.n.a May 27 '21 at 18:46
21

In addition to all the previous answers, I would like to tell about the different behavior of gather() and wait() in case they are cancelled.

Gather() cancellation

If gather() is cancelled, all submitted awaitables (that have not completed yet) are also cancelled.

Wait() cancellation

If the wait()ing task is cancelled, it simply throws an CancelledError and the waited tasks remain intact.

Simple example:

import asyncio


async def task(arg):
    await asyncio.sleep(5)
    return arg


async def cancel_waiting_task(work_task, waiting_task):
    await asyncio.sleep(2)
    waiting_task.cancel()
    try:
        await waiting_task
        print("Waiting done")
    except asyncio.CancelledError:
        print("Waiting task cancelled")

    try:
        res = await work_task
        print(f"Work result: {res}")
    except asyncio.CancelledError:
        print("Work task cancelled")


async def main():
    work_task = asyncio.create_task(task("done"))
    waiting = asyncio.create_task(asyncio.wait({work_task}))
    await cancel_waiting_task(work_task, waiting)

    work_task = asyncio.create_task(task("done"))
    waiting = asyncio.gather(work_task)
    await cancel_waiting_task(work_task, waiting)


asyncio.run(main())

Output:

asyncio.wait()
Waiting task cancelled
Work result: done
----------------
asyncio.gather()
Waiting task cancelled
Work task cancelled

Application example

Sometimes it becomes necessary to combine wait() and gather() functionality. For example, we want to wait for the completion of at least one task and cancel the rest pending tasks after that, and if the waiting itself was canceled, then also cancel all pending tasks.

As real examples, let's say we have a disconnect event and a work task. And we want to wait for the results of the work task, but if the connection was lost, then cancel it. Or we will make several parallel requests, but upon completion of at least one response, cancel all others.

It could be done this way:

import asyncio
from typing import Optional, Tuple, Set


async def wait_any(
        tasks: Set[asyncio.Future], *, timeout: Optional[int] = None,
) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]:
    tasks_to_cancel: Set[asyncio.Future] = set()
    try:
        done, tasks_to_cancel = await asyncio.wait(
            tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
        )
        return done, tasks_to_cancel
    except asyncio.CancelledError:
        tasks_to_cancel = tasks
        raise
    finally:
        for task in tasks_to_cancel:
            task.cancel()


async def task():
    await asyncio.sleep(5)


async def cancel_waiting_task(work_task, waiting_task):
    await asyncio.sleep(2)
    waiting_task.cancel()
    try:
        await waiting_task
        print("Waiting done")
    except asyncio.CancelledError:
        print("Waiting task cancelled")

    try:
        res = await work_task
        print(f"Work result: {res}")
    except asyncio.CancelledError:
        print("Work task cancelled")


async def check_tasks(waiting_task, working_task, waiting_conn_lost_task):
    try:
        await waiting_task
        print("waiting is done")
    except asyncio.CancelledError:
        print("waiting is cancelled")

    try:
        await waiting_conn_lost_task
        print("connection is lost")
    except asyncio.CancelledError:
        print("waiting connection lost is cancelled")

    try:
        await working_task
        print("work is done")
    except asyncio.CancelledError:
        print("work is cancelled")


async def work_done_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def conn_lost_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await asyncio.sleep(2)
    connection_lost_event.set()  # <---
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def cancel_waiting_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await asyncio.sleep(2)
    waiting_task.cancel()  # <---
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def main():
    print("Work done")
    print("-------------------")
    await work_done_case()
    print("\nConnection lost")
    print("-------------------")
    await conn_lost_case()
    print("\nCancel waiting")
    print("-------------------")
    await cancel_waiting_case()


asyncio.run(main())

Output:

Work done
-------------------
waiting is done
waiting connection lost is cancelled
work is done

Connection lost
-------------------
waiting is done
connection is lost
work is cancelled

Cancel waiting
-------------------
waiting is cancelled
waiting connection lost is cancelled
work is cancelled
alex_noname
  • 26,459
  • 5
  • 69
  • 86
-3

You are correct that asyncio.gather() and asyncio.wait() have similar uses. Both functions are used to execute multiple coroutines concurrently. However, there are some differences between the two functions.

asyncio.gather() is used to execute multiple coroutines concurrently and wait for them all to complete. It returns the results of all the coroutines as a list in the order in which they were passed to the function. If any of the coroutines raise an exception, asyncio.gather() will raise a FirstException exception.

asyncio.wait() is used to wait for one or more coroutines to complete. It returns two sets of tasks: one set of tasks that have completed and another set of tasks that have not completed. You can use this function to wait for a specific condition in a collection of tasks, such as all complete, the first to complete, or the first to fail.

So, while both functions can be used for similar purposes, they have different use cases. You can use asyncio.gather() when you want to execute multiple coroutines concurrently and wait for them all to complete. You can use asyncio.wait() when you want to wait for one or more coroutines to complete.

  • 1
    Welcome to Stack Overflow! This answer, along with the other two you posted today, appears likely to have been written (entirely or partially) by AI (e.g., ChatGPT). Please be aware that [posting of AI-generated content is banned here](//meta.stackoverflow.com/q/421831). If you used an AI tool to assist with any answer, I would encourage you to delete it. – NotTheDr01ds Jun 03 '23 at 16:40
  • 1
    **Readers should review this answer carefully and critically, as AI-generated information often contains fundamental errors and misinformation.** If you observe quality issues and/or have reason to believe that this answer was generated by AI, please leave feedback accordingly. The moderation team can use your help to identify quality issues. – NotTheDr01ds Jun 03 '23 at 16:40