0

I need to be able to keep adding coroutines to the asyncio loop at runtime. I tried using create_task() thinking that this would do what I want, but it still needs to be awaited.

This is the code I had, not sure if there is a simple edit to make it work?

async def get_value_from_api():
    global ASYNC_CLIENT
    return ASYNC_CLIENT.get(api_address)


async def print_subs():
    count = await get_value_from_api()
    print(count)


async def save_subs_loop():
    while True:
        asyncio.create_task(print_subs())
        time.sleep(0.1)


async def start():
    global ASYNC_CLIENT
    async with httpx.AsyncClient() as ASYNC_CLIENT:
        await save_subs_loop()


asyncio.run(start())
Jonny Shanahan
  • 351
  • 3
  • 13
  • I found [this question](https://stackoverflow.com/questions/37278647/fire-and-forget-python-async-await) and I think it might be useful, but I'm unsure if it's a duplicate. Could you check it out and see if it helps you? – Marc Sances Dec 19 '21 at 12:39
  • Another similar approach can be inspired from this answer for periodic addition: https://stackoverflow.com/questions/37512182/how-can-i-periodically-execute-a-function-with-asyncio?noredirect=1&lq=1 – Jishan Shaikh Dec 19 '21 at 12:46
  • So you want to gather results from a dynamic queue e.g. `async for done in consume(queue): queue.extend(do_something(done))` – amirouche Dec 19 '21 at 12:46
  • How do you know that there is no more work, and the `async for` should stop? – amirouche Dec 19 '21 at 12:47
  • That is a classic producer-consumer pattern. – amirouche Dec 19 '21 at 12:48
  • @MarcSances Thanks, but it looks like that answer is suggesting the way I have done it, or by using `ensure_future()` instead of `create_task()`, but I have tried both and nothing happens unless await it, in which case it's not asynchronous. I don't really understand why it's not working – Jonny Shanahan Dec 19 '21 at 12:54
  • @JishanShaikh I think that answer is actually synchronous, because an `await` in the while loop will be blocking the loop, meaning that each `asyncio.sleep` gets called sequentially. What I need is something which doesn't need to finish before the next task is created. – Jonny Shanahan Dec 19 '21 at 13:01

3 Answers3

3

asyncio.create_task() works as you describe it. The problem you are having here is that you create an infinite loop here:

async def save_subs_loop():
    while True:
        asyncio.create_task(print_subs())
        time.sleep(0.1) # do not use time.sleep() in async code EVER

save_subs_loop() keeps creating tasks but control is never yielded back to the event loop, because there is no await in there. Try

async def save_subs_loop():
    while True:
        asyncio.create_task(print_subs())
        await asyncio.sleep(0.1) # yield control back to loop to give tasks a chance to actually run

This problem is so common I'm thinking python should raise a RuntimeError if it detects time.sleep() within a coroutine :-)

thisisalsomypassword
  • 1,423
  • 1
  • 6
  • 17
2

I once created similar pattern when I was mixing trio and kivy, which was demonstration of running multiple coroutines asynchronously.

It use a trio.MemoryChannel which is roughly equivalent to asyncio.Queue, I'll just refer it as queue here.

Main idea is:

  1. Wrap each task with class, which has run function.
  2. Make class object's own async method to put object itself into queue when execution is done.
  3. Create a global task-spawning loop to wait for the object in queue and schedule execution/create task for the object.
import asyncio
import traceback

import httpx


async def task_1(client: httpx.AsyncClient):
    resp = await client.get("http://127.0.0.1:5000/")
    print(resp.read())
    await asyncio.sleep(0.1)  # without this would be IP ban


async def task_2(client: httpx.AsyncClient):
    resp = await client.get("http://127.0.0.1:5000/meow/")
    print(resp.read())
    await asyncio.sleep(0.5)


class CoroutineWrapper:
    def __init__(self, queue: asyncio.Queue,  coro_func, *param):
        self.func = coro_func
        self.param = param
        self.queue = queue

    async def run(self):
        try:
            await self.func(*self.param)
        except Exception:
            traceback.print_exc()
            return
        
        # put itself back into queue
        await self.queue.put(self)


class KeepRunning:
    def __init__(self):
        # queue for gathering CoroutineWrapper
        self.queue = asyncio.Queue()

    def add_task(self, coro, *param):
        wrapped = CoroutineWrapper(self.queue, coro, *param)
        
        # add tasks to be executed in queue
        self.queue.put_nowait(wrapped)

    async def task_processor(self):
        task: CoroutineWrapper
        while task := await self.queue.get():
            # wait for new CoroutineWrapper Object then schedule it's async method execution
            asyncio.create_task(task.run())


async def main():
    keep_running = KeepRunning()
    async with httpx.AsyncClient() as client:
        keep_running.add_task(task_1, client)
        keep_running.add_task(task_2, client)

        await keep_running.task_processor()

asyncio.run(main())

Server

import time

from flask import Flask
app = Flask(__name__)


@app.route("/")
def hello():
    return str(time.time())


@app.route("/meow/")
def meow():
    return "meow"


app.run()

Output:

b'meow'
b'1639920445.965701'
b'1639920446.0767004'
b'1639920446.1887035'
b'1639920446.2986999'
b'1639920446.4067013'
b'meow'
b'1639920446.516704'
b'1639920446.6267014'
...

You can see tasks running repeatedly on their own pace.


Old answer

Seems like you only want to cycle fixed amount of tasks.

In that case just iterate list of coroutine with itertools.cycle

But this is no different with synchronous, so lemme know if you need is asynchronous.

import asyncio
import itertools

import httpx


async def main_task(client: httpx.AsyncClient):
    resp = await client.get("http://127.0.0.1:5000/")
    print(resp.read())
    await asyncio.sleep(0.1)  # without this would be IP ban


async def main():
    async with httpx.AsyncClient() as client:
        for coroutine in itertools.cycle([main_task]):
            await coroutine(client)


asyncio.run(main())

Server:

import time

from flask import Flask
app = Flask(__name__)


@app.route("/")
def hello():
    return str(time.time())


app.run()

Output:

b'1639918937.7694323'
b'1639918937.8804302'
b'1639918937.9914327'
b'1639918938.1014295'
b'1639918938.2124324'
b'1639918938.3204308'
...
jupiterbjy
  • 2,882
  • 1
  • 10
  • 28
0

You might want to try the TaskThread framework

  • It allows you to add tasks in runtime
  • Tasks are re-scheduled periodically (like in your while loop up there)
  • There is a consumer / producer framework built in (parent/child relationships) which you seem to need

disclaimer: I wrote TaskThread out of necessity & it's been a life saver.

El Sampsa
  • 1,673
  • 3
  • 17
  • 33