The Python Oracle

Asyncio.gather vs asyncio.wait

Become part of the top 3% of the developers by applying to Toptal https://topt.al/25cXVn

--

Track title: CC D Schuberts Piano Sonata D 850 in D

--

Chapters
00:00 Question
00:41 Accepted answer (Score 325)
02:54 Answer 2 (Score 75)
03:33 Answer 3 (Score 73)
07:27 Answer 4 (Score 22)
07:55 Thank you

--

Full question
https://stackoverflow.com/questions/4223...

Question links:
[asyncio.gather]: https://docs.python.org/3/library/asynci...
[asyncio.wait]: https://docs.python.org/3/library/asynci...

Accepted answer links:
[asyncio.gather()]: https://docs.python.org/3/library/asynci...
[asyncio.wait()]: https://docs.python.org/3/library/asynci...
[TaskGroup]: https://docs.python.org/3/library/asynci...
[TaskGroup]: https://docs.python.org/3/library/asynci...

--

Content licensed under CC BY-SA
https://meta.stackexchange.com/help/lice...

--

Tags
#python #asynchronous #asyncawait #pythonasyncio

#avk47



ACCEPTED ANSWER

Score 373


Although similar in general cases ("run and get results for many tasks"), each function has some specific functionality for other cases (and see also TaskGroup for Python 3.11+ below):

asyncio.gather()

Returns a Future instance, allowing high level grouping of tasks:

import asyncio
from pprint import pprint

import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(1, 3))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])

all_groups = asyncio.gather(group1, group2, group3)

results = loop.run_until_complete(all_groups)

loop.close()

pprint(results)

All tasks in a group can be cancelled by calling group2.cancel() or even all_groups.cancel(). See also .gather(..., return_exceptions=True),

asyncio.wait()

Supports waiting to be stopped after the first task is done, or after a specified timeout, allowing lower level precision of operations:

import asyncio
import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(0.5, 5))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

print("Get first result:")
finished, unfinished = loop.run_until_complete(
    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

for task in finished:
    print(task.result())
print("unfinished:", len(unfinished))

print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
    asyncio.wait(unfinished, timeout=2))

for task in finished2:
    print(task.result())
print("unfinished2:", len(unfinished2))

print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))

for task in finished3:
    print(task.result())

loop.close()

TaskGroup (Python 3.11+)

Update: Python 3.11 introduces TaskGroups which can "automatically" await more than one task without gather() or await():

# Python 3.11+ ONLY!
async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print("Both tasks have completed now.")



ANSWER 2

Score 104


A very important distinction, which is easy to miss, is the default behavior of these two functions, when it comes to exceptions.


I'll use this example to simulate a coroutine that will raise exceptions, sometimes -

import asyncio
import random


async def a_flaky_tsk(i):
    await asyncio.sleep(i)  # bit of fuzz to simulate a real-world example

    if i % 2 == 0:
        print(i, "ok")
    else:
        print(i, "crashed!")
        raise ValueError

coros = [a_flaky_tsk(i) for i in range(10)]

await asyncio.gather(*coros) outputs -

0 ok
1 crashed!
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 20, in <module>
    asyncio.run(main())
  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 17, in main
    await asyncio.gather(*coros)
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError

As you can see, the coros after index 1 never got to execute. Future returned by gather() is done at that point (unlike wait()) and program terminates, but if you could keep the program alive, other coroutines still would have chance to run:

async def main():
    coros = [a_flaky_tsk(i) for i in range(10)]
    await asyncio.gather(*coros)
    

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.create_task(main())
    loop.run_forever()

# 0 ok
# 1 crashed!
# Task exception was never retrieved
#  ....
# 2 ok
# 3 crashed!
# 4 ok
# 5 crashed!
# 6 ok
# 7 crashed!
# 8 ok
# 9 crashed!


But await asyncio.wait(coros) continues to execute tasks, even if some of them fail (Future returned by wait() is not done, unlike gather()) -

0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
Task exception was never retrieved
future: <Task finished name='Task-10' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-8' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-9' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError

Of course, this behavior can be changed for both by using -

asyncio.gather(..., return_exceptions=True)

or,

asyncio.wait([...], return_when=asyncio.FIRST_EXCEPTION)


But it doesn't end here!

Notice: Task exception was never retrieved in the logs above.

asyncio.wait() won't re-raise exceptions from the child tasks until you await them individually. (The stacktrace in the logs are just messages, they cannot be caught!)

done, pending = await asyncio.wait(coros)
for tsk in done:
    try:
        await tsk
    except Exception as e:
        print("I caught:", repr(e))

Output -

0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()

On the other hand, to catch exceptions with asyncio.gather(), you must -

results = await asyncio.gather(*coros, return_exceptions=True)
for result_or_exc in results:
    if isinstance(result_or_exc, Exception):
        print("I caught:", repr(result_or_exc))

(Same output as before)




ANSWER 3

Score 81


asyncio.wait is more low level than asyncio.gather.

As the name suggests, asyncio.gather mainly focuses on gathering the results. It waits on a bunch of futures and returns their results in a given order.

asyncio.wait just waits on the futures. And instead of giving you the results directly, it gives done and pending tasks. You have to manually collect the values.

Moreover, you could specify to wait for all futures to finish or just the first one with wait.




ANSWER 4

Score 23


I also noticed that you can provide a group of coroutines in wait() by simply specifying the list:

result=loop.run_until_complete(asyncio.wait([
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ]))

Whereas grouping in gather() is done by just specifying multiple coroutines:

result=loop.run_until_complete(asyncio.gather(
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ))