0

I have the following code:

import asyncio

async def myfunc(i):
    print("hello", i)
    await asyncio.sleep(i)
    print("world", i)

async def main():
    asyncio.create_task(myfunc(2))
    asyncio.create_task(myfunc(1))
    
asyncio.run(main())

It outputs:

hello 2
hello 1

Notice that world isn't printed anywhere. Why is the output we see being produced? I was expecting:

hello 2
hello 1
world 1
world 2

Because I thought that the asyncio.sleep(i) calls would yield execution to the event loop, at which point the event loop would reschedule them after their respective wait times. Clearly I am misunderstanding. Can someone explain?

Bunny
  • 192
  • 1
  • 1
  • 14

2 Answers2

1

The problem is that the loop in main is not waiting for the tasks to finish causing the tasks to not finish executing. Use asyncio.gather() to launch and wait for all the coroutines to execute.

import asyncio


async def myfunc(i):
    print("hello", i)
    await asyncio.sleep(i)
    print("world", i)


async def main():
    await asyncio.gather(myfunc(2), myfunc(1))


asyncio.run(main())

The logic that you describe in the comments is more complicated and there is no function that implements it, so you have to design the logic, in this case there are 2 conditions that must happen to finish the application:

  • All tasks must be launched.
  • There should be no active task.

Considering this I use a Future to create a flag that indicates it, also use add_done_callback to notify me that the job has finished.

import asyncio
from collections import deque
import random


async def f(identifier, seconds):
    print(f"Starting task: {identifier}, seconds: {seconds}s")
    await asyncio.sleep(seconds)
    print(f"Finished task: {identifier}")
    return identifier


T = 3


class Manager:
    def __init__(self, q):
        self._q = q
        self._future = asyncio.get_event_loop().create_future()
        self._active_tasks = set()
        self._status = False

    @property
    def q(self):
        return self._q

    def launch_task(self):
        try:
            identifier = self.q.pop()
        except IndexError:
            return False
        else:
            seconds = random.uniform(T - 1, T + 1)
            task = asyncio.create_task(f(identifier, seconds))
            self._active_tasks.add(identifier)
            task.add_done_callback(self._finished_callback)
            return True

    async def work(self):
        self._status = True
        while self.launch_task():
            await asyncio.sleep(T)
        self._status = False
        await self._future

    def _finished_callback(self, c):
        self._active_tasks.remove(c.result())
        if not self._active_tasks and not self._status:
            self._future.set_result(None)


async def main():
    identifiers = deque(range(10))
    manager = Manager(identifiers)
    await manager.work()


if __name__ == "__main__":
    asyncio.run(main())
eyllanesc
  • 235,170
  • 19
  • 170
  • 241
  • I see. How would I approach this in the case where I create_task every 2 seconds in a loop in main until a network I/O operation has completed? I'd like to do something like the equivalent of pthread_join after the loop, is there such an equivalent? I'd use gather but I need the loop to be non blocking – Bunny Mar 19 '22 at 21:07
  • @CodeM4aster I don't understand your comment, avoid talking about networks and pthreads. You want to launch a new task every T seconds, and this task can take N seconds. I am right? If so, is there an event that should cause the program to terminate or should it run forever? – eyllanesc Mar 19 '22 at 21:17
  • Not quite. In a loop I create a new task every T seconds. Each consumes from a global job queue and loops until it is empty. The loop that is creating tasks is also looping until the global queue is empty. As soon as that loop finishes it ends the program even though some tasks haven't finished. I'd like to wait until they finish before exiting – Bunny Mar 19 '22 at 22:04
  • @CodeM4aster From what I understand you have already predefined N tasks that are launched sequentially every T seconds, and you want the program when all the tasks finish executing, am I correct? – eyllanesc Mar 19 '22 at 22:07
  • Almost, the tasks are not predefined. When I create a task I need information that is only available at runtime. There is no upper bound on N, I keep creating new tasks every T seconds until the job queue empties, at which point I should make no more tasks but let the existing tasks complete – Bunny Mar 19 '22 at 22:09
  • @CodeM4aster N depends on the task queue, that's what I mean. I point out the predefined precisely because everything depends on an initial fixed data: the queue. – eyllanesc Mar 19 '22 at 22:12
  • My mistake, N does depend on the queue, but my tasks are not predefined. The job queue is a queue of data blocks to download from peers. A single peer has some fixed upload bandwidth and my download bandwidth is unlimited. I am made aware of a new peer every T seconds. Hence I create a task for each new peer I am made aware of rather than for each data block. As soon as all blocks have been consumed from the queue, each task should finish downloading its final block then the program should exit – Bunny Mar 19 '22 at 22:21
  • @CodeM4aster see the update – eyllanesc Mar 19 '22 at 22:44
0

Found a much simpler solution than the one provided by @eyllanesc here. Turns out there is a function that implements it

Bunny
  • 192
  • 1
  • 1
  • 14