4

I am new to asyncio and trying to understand basic for loop behavior. The code below executes sequentially, but my naive assumption was that while the sleeps are occurring, other items could be fetched via the for loop and start processing. But that doesn't seem to happen.

For example, while the code is "doing something else with 1" it seems like it could fetch the next item from the loop and start working on it while waiting for the sleep to end on item 1. But when I run, it executes sequentially with pauses for the sleeps like a non-async program.

What am I missing here?

import asyncio


class CustomIterator():
    def __init__(self):
        self.counter = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.counter >= 3:
            raise StopAsyncIteration
        await asyncio.sleep(1)
        self.counter += 1
        return self.counter


async def f(item):
    print(f"doing something with {item}")
    await asyncio.sleep(3)


async def f2(item):
    print(f"doing something else with {item}")
    await asyncio.sleep(2)


async def do_async_stuff():
    async for item in CustomIterator():
        print(f"got {item}")
        await f(item)
        await f2(item)


if __name__ == '__main__':
    asyncio.run(do_async_stuff())

Output:

got 1
doing something with 1
doing something else with 1
got 2
doing something with 2
doing something else with 2
got 3
doing something with 3
doing something else with 3
mkrieger1
  • 19,194
  • 5
  • 54
  • 65
chacmool
  • 1,357
  • 2
  • 16
  • 21
  • Does this answer your question? [How to run multiple coroutines concurrently using asyncio?](https://stackoverflow.com/questions/32054066/how-to-run-multiple-coroutines-concurrently-using-asyncio) – mkrieger1 Mar 13 '23 at 20:43
  • 1
    The accepted answer of this question is explaining very well, what `async for` is made for, i.e. not automatic parallelisation: https://stackoverflow.com/questions/56161595/how-to-use-async-for-in-python – user_na Mar 14 '23 at 07:25

4 Answers4

4

The entire point of await is to create a sequence and yield point: when you await foo, the goal is to give control to the executor so it can run other tasks, until whatever foo is resolves and you get control back.

If you want to create concurrency in async code, you need to either:

  1. create tasks (using the appropriately named create_task), each task is an other thing the executor can run while one task is await-ing
  2. compose coroutines together using utilities like wait, as_completed, or gather which register multiple objects against the executor at the same time, this way their asynchronous resolution will overlap rather than chain
Masklinn
  • 34,759
  • 3
  • 38
  • 57
2

I think you have a common misunderstanding of how async works. You have written your program to be synchronous. await foo() says to call foo(), and feel free to go do something else while we're waiting for foo to return with its answer. Likewise, getting the next element from your custom iterator says "get the next element of this iterator, but feel free to go do something else while waiting for the result". In both cases, you have nothing else to do, so your code wants.

If it is safe for two things in your code to run at once, it is your job to say so, using appropriate primitives.

Frank Yellin
  • 9,127
  • 1
  • 12
  • 22
  • Ok, that explanation helps, I guess it's the "you have nothing else to do" part that is confusing me. I thought the remainder of the program was the "other stuff to do". Will keep reading up on this stuff. – chacmool Mar 13 '23 at 22:12
  • If it is safe to do multiple parts of your stuff at the same time, you have to let it know. – Frank Yellin Mar 14 '23 at 00:34
2

The previous answers describe well why everything behaves like it does. Here is an actual example of how to use a task to archive your wanted concurency :

import asyncio
import time

class CustomIterator():
    def __init__(self):
        self.counter = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.counter >= 3:
            raise StopAsyncIteration
        await asyncio.sleep(1)
        self.counter += 1
        print(f'got {self.counter}')
        return self.counter

async def f(item, stime):
    print(f"start f with {item} at t={time.time() - stime:.1f}")
    await asyncio.sleep(5)
    print(f"done f with {item} at t={time.time() - stime:.1f}")


async def f2(item, stime):
    print(f"start f2 with {item} at t={time.time() - stime:.0f}")
    await asyncio.sleep(2)
    print(f"done f2 with {item} at t={time.time() - stime:.0f}")

async def do_async_stuff():
    tasks = []
    t = time.time()
    async for obj in CustomIterator():
        tasks.append(asyncio.create_task(f(obj, t)))
        tasks.append(asyncio.create_task(f2(obj, t)))
    
    await asyncio.gather(*tasks)

    
if __name__ == '__main__':
    asyncio.run(do_async_stuff())

Which outputs:

got 1
start f with 1 at t=1.0
start f2 with 1 at t=1.0
got 2
start f with 2 at t=2.0
start f2 with 2 at t=2.0
done f2 with 1 at t=3.0
got 3
start f with 3 at t=3.0
start f2 with 3 at t=3.0
done f2 with 2 at t=4.0
done f2 with 3 at t=5.0
done f with 1 at t=6.0
done f with 2 at t=7.0
done f with 3 at t=8.0
user_na
  • 2,154
  • 1
  • 16
  • 36
0

To add more context on the other great answers, I'd like to detail a bit more in depth some definitions (quite simplified) which may help you to better understand the issue:

  • Concurrency refers to the ability to execute multiple tasks at seemingly the same period of time.
  • Parallelism refers to the ability to execute multiple tasks at physically the same period of time.
  • Asynchrony refers to actual ways to deal with concurrent tasks.

They are many of such ways of dealing with concurrent tasks at the application level, such as:

  • callback-based (callback functions),
  • dispatch queues,
  • the future and promise pattern,
  • the async/await pattern,
  • etc.

Using one of these forms of asynchronous programming does not guarantee that your program will execute concurrently, you often have to explicitly state that.

AsyncIO —as stated in the documentation— is "a library to write concurrent code using the async/await syntax.", and as its name suggests, is mainly dedicated to deal with I/O (Input/Output) asynchronous events (but not limited to).

To create a concurrent program using AsyncIO you need two things:

  • using the async/await syntax, and
  • explicitly schedule tasks concurrently, for instance with asyncio.gather and asyncio.create_task.

Without that, your program will generally executes sequentially as you observed in your example.

Louis Lac
  • 5,298
  • 1
  • 21
  • 36