80

I mean what do I get from using async for. Here is the code I write with async for, AIter(10) could be replaced with get_range().

But the code runs like sync not async.

import asyncio

async def get_range():
    for i in range(10):
        print(f"start {i}")
        await asyncio.sleep(1)
        print(f"end {i}")
        yield i

class AIter:
    def __init__(self, N):
        self.i = 0
        self.N = N

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        print(f"start {i}")
        await asyncio.sleep(1)
        print(f"end {i}")
        if i >= self.N:
            raise StopAsyncIteration
        self.i += 1
        return i

async def main():
    async for p in AIter(10):
        print(f"finally {p}")

if __name__ == "__main__":
    asyncio.run(main())

The result I excepted should be :

start 1
start 2
start 3
...
end 1
end 2
...
finally 1
finally 2
...

However, the real result is:

start 0
end 0
finally 0
start 1
end 1
finally 1
start 2
end 2

I know I could get the excepted result by using asyncio.gather or asyncio.wait.

But it is hard for me to understand what I got by use async for here instead of simple for.

What is the right way to use async for if I want to loop over several Feature object and use them as soon as one is finished. For example:

async for f in feature_objects:
    data = await f
    with open("file", "w") as fi:
        fi.write()
PaleNeutron
  • 2,543
  • 4
  • 25
  • 43
  • @user4815162342, yes, thanks a lot. But I'm still looking for some example of `async source`. Can you add an example usage of `async for` syntax? – PaleNeutron Jun 03 '19 at 07:50
  • Any async generator can serve as an async source. For a more concrete example, see e.g. [this answer](https://stackoverflow.com/a/56280107/1600898) exposes a sequence of callback invocations as an async iterator which is iterable using `async for`. – user4815162342 Jun 03 '19 at 08:52
  • btw, you can try aiofiles to handle files in asyncio way – Tsonglew Dec 12 '19 at 09:18
  • a question on the blocking for loop. I could have a regular for loop `for in range(10):` and await inside of it e.g. `await asyncio.sleep(i)`, which would return control to the caller and allow concurrency. Right? Note that of course my sleep is silly as only is meant to simulate an expensive op (also called an io-bound op). – Charlie Parker Jun 22 '22 at 19:15
  • is a good example of the use of `async for` is that `async for` does NOT block since it gets the next items with an implicit `await it.anext_step()` or something? – Charlie Parker Jun 22 '22 at 19:20
  • I'd like to see an example contrasting an `async for` vs `for` -- especially showing the advantages of `async for`, which I assume will be that async gets the next values without blocking. But I'd like to know if other details e.g. if the async for loop implicitly is capable of doing something similar to gather were all future values are obtained. Basically I know await doesn't allow the next execution of the remaining code until the value is returned so I'd like to see perhaps ... – Charlie Parker Jun 22 '22 at 19:23
  • what a similar implementation of the async for would look like with manual awaits (if that is even a question that make sense to ask). – Charlie Parker Jun 22 '22 at 19:23

4 Answers4

148

But it is hard for me to understand what I got by use async for here instead of simple for.

The underlying misunderstanding is expecting async for to automatically parallelize the iteration. It doesn't do that, it simply allows sequential iteration over an async source. For example, you can use async for to iterate over lines coming from a TCP stream, messages from a websocket, or database records from an async DB driver.

None of the above would work with an ordinary for, at least not without blocking the event loop. This is because for calls __next__ as a blocking function and doesn't await its result. You cannot manually await elements obtained by for because for expects __next__ to signal the end of iteration by raising StopIteration. If __next__ is a coroutine, the StopIteration exception won't be visible before awaiting it. This is why async for was introduced, not just in Python, but also in other languages with async/await and generalized for.

If you want to run the loop iterations in parallel, you need to start them as parallel coroutines and use asyncio.as_completed or equivalent to retrieve their results as they come:

async def x(i):
    print(f"start {i}")
    await asyncio.sleep(1)
    print(f"end {i}")
    return i

# run x(0)..x(10) concurrently and process results as they arrive
for f in asyncio.as_completed([x(i) for i in range(10)]):
    result = await f
    # ... do something with the result ...

If you don't care about reacting to results immediately as they arrive, but you need them all, you can make it even simpler by using asyncio.gather:

# run x(0)..x(10) concurrently and process results when all are done
results = await asyncio.gather(*[x(i) for i in range(10)])
Martin Meeser
  • 2,784
  • 2
  • 28
  • 41
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • 1
    Checking my understanding -- both of your code snippets (`for f in asyncio.as_completed...` and `results = await ...` would have to be executed within an async function/method, within a call chain kicked off by `asyncio.run(...)`, right? – hBy2Py Mar 10 '21 at 20:17
  • 1
    @hBy2Py Correct. The question (and therefore the answer too) just omits that part for brevity. – user4815162342 Mar 10 '21 at 21:36
  • 1
    I like the explainer, but am missing an example for the `async for` loop – Roelant Mar 21 '21 at 18:28
  • 2
    @Roelant You're right that an example would be useful. This answer tried to address the specific points raised in the question, which made sense at the time, but reduce its value as a general resource. Adding a real-life example at this point would make the answer quite a bit longer than it is now. Hopefully there are other SO questions that clarify the issue and, if not, maybe it's time for a new question. – user4815162342 Mar 21 '21 at 20:27
  • a question on the blocking for loop. I could have a regular for loop `for in range(10):` and await inside of it e.g. `await asyncio.sleep(i)`, which would return control to the caller and allow concurrency. Right? Note that of course my sleep is silly as only is meant to simulate an expensive op (also called an io-bound op). – Charlie Parker Jun 22 '22 at 19:15
  • is a good example of the use of `async for` is that `async for` does NOT block since it gets the next items with an implicit `await it.anext_step()` or something? – Charlie Parker Jun 22 '22 at 19:20
  • @CharlieParker That's correct, an ordinary for loop is perfectly fine when you don't need to await to get the next item (as is the case with `range(10)`). – user4815162342 Jun 23 '22 at 06:09
10

(Adding on the accepted answer - for Charlie's bounty).

Assuming you want to consume each yielded value concurrently, a straightforward way would be:

import asyncio

async def process_all():
    tasks = []

    async for obj in my_async_generator:
        # Python 3.7+. Use ensure_future for older versions.
        task = asyncio.create_task(process_obj(obj))
        tasks.append(task)
    
    await asyncio.gather(*tasks)


async def process_obj(obj):
    ...

Explanation:

Consider the following code, without create_task:

async def process_all():
    async for obj in my_async_generator:
        await process_obj(obj))

This is roughly equivalent to:

async def process_all():
    obj1 = await my_async_generator.__anext__():
    await process_obj(obj1))

    obj2 = await my_async_generator.__anext__():
    await process_obj(obj1))
    
    ...

Basically, the loop cannot continue because its body is blocking. The way to go is to delegate the processing of each iteration to a new asyncio task which will start without blocking the loop. The, gather wait for all of the tasks - which means, for every iteration to be processed.

matan129
  • 1,508
  • 2
  • 20
  • 33
  • 1
    really love your example! Wish we could actually run it though. One quick comment, I think it's useful to mention that the `.create_task(coroutine(args))` function actually dispatches a coroutine to be executed concurrently and doesn't block. – Charlie Parker Jun 27 '22 at 14:05
  • 1
    So, the main difference between `async for` and `for` is the difference between `__next__()` and `__anext__()`? Can you expand the answer a little more. (with a real `my_async_generator` could be much better). – PaleNeutron Jun 28 '22 at 00:55
0

As others have pointed out, async for doesn't create tasks to be run concurrently. It's used to allows sequential iteration over an async source.

As an example, in aiokafka, you could do async for msg in consumer. The __anext__ method in consumer is called in each iteration. This method is defined as async def __anext__ , allowing a call await self.get_one() inside it.

In comparision, when you use a normal for loop, it internally invokes the __next__ special method. However, the regular __next__ method does not support waiting for an async source, such as using await get_one().

l001d
  • 723
  • 9
  • 15
-1

Code based on fantastic answer from @matan129, just missing the async generator to make it runnable, once I have that (or if someone wants to contributed it) will finilize this:


import time

import asyncio


async def process_all():
    """
    Example where the async for loop allows to loop through concurrently many things without blocking on each individual
    iteration but blocks (waits) for all tasks to run.
    ref:
    - https://stackoverflow.com/questions/56161595/how-to-use-async-for-in-python/72758067#72758067
    """
    tasks = []

    async for obj in my_async_generator:
        # Python 3.7+. Use ensure_future for older versions.
        task = asyncio.create_task(process_obj(obj))  # concurrently dispatches a coroutine to be executed.
        tasks.append(task)

    await asyncio.gather(*tasks)


async def process_obj(obj):
    await asyncio.sleep(5)  # expensive IO


if __name__ == '__main__':
    # - test asyncio
    s = time.perf_counter()
    asyncio.run(process_all())
    # - print stats
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")
    print('Success, done!\a')
Charlie Parker
  • 5,884
  • 57
  • 198
  • 323