3

Pretty new to python asyncio lib and have been banging my head to implement a keep alive task. I want to concurrently run a cpu intensive task and a keep alive task. The keep alive should run periodically until cpu intensive finished.

import asyncio
import time

async def cpu_intensive():
    print("cpu_intensive for 3 seconds")
    delay = 3
    close_time = time.time() + delay
    while True:
        if time.time() > close_time:
            break


async def keep_alive():
    print("keep alive for 1 second") # My real use case is I want to send a heart beat message every x seconds until cpu intensive finished
    await asyncio.sleep(1)

async def main():
    cpu_intensive_task = asyncio.create_task(cpu_intensive())
    keep_alive_task = asyncio.create_task(keep_alive())
    print(f"Started at {time.strftime('%X')}")
    # TODO: Not sure how to achieve the expected output
    print(f"Finished at {time.strftime('%X')}")

asyncio.run(main())

'''
Expected Output
Started at 23:55:08
cpu_intensive 3 seconds
keep alive for 1 seconds
keep alive for 1 seconds
keep alive for 1 seconds
Finished at 23:55:11
'''

I have browse through the asyncio lib python and tried several API such as await, run_coroutine_threadsafe, asyncio.gather. But couldn't get it work.

xuanyue
  • 1,368
  • 1
  • 17
  • 36
  • 1
    You probably want to look into synchronization primatives like asyncio.Event(). Using a synchronization primative allows you to share state between threads/processes in a thread-safe manner. That should let your listener process wait for the resource heavy process to finish. – Jared Dobry Nov 18 '21 at 05:28
  • 3
    If you’re looking to run a CPU-intensive task, you’ll get want to use something like `ProcessPoolExecutor` from concurrent.futures. asyncio is meant for I/O-bound tasks. – dirn Nov 19 '21 at 01:17
  • Just to add to above, `asyncio` uses the same thread some you will run into GIL if you do a CPU-intenstive task. – Nishant Nov 19 '21 at 07:36

2 Answers2

3

I think you might have confused concept of concurrency with Parallelism. Let me write some of what I understood about when playing with asyncio:

Two way of achieving concurrency

  • Parallel: 'Physical' concurrency, where there could be more than 1 line of code being executed simultaneously at one point.

    • library: multiprocessing
    • pro: Can utilize Multiple core
    • con: Takes some resource to create processes. High overhead communicating with processes(pickle serialization is used). workload has to be thread-safe.
  • Asynchronous(await/async): 'Perceived' concurrency, where there can't be more than 1 line of code being executed at any given time, but achieve concurrency via context switching. Uses await keyword to allow context changes.

    • libraries: asyncio curio trio
    • pro: Can utilize one core better than synchronous code. Much lightweight. Control flows are more predictable than threading. (context switching ONLY happens on await keywords.)
    • con: Cannot utilize Multiple core. Can't run more than 1 code at any given time. Can't switch context at middle of heavy workload.
  • Asynchronous(time division): aka Thread. Thread in python can only execute 1 line of code at any given time due to GIL. Therefore shares similarity with above.

    • libraries: threading
    • pro: Can utilize one core better than synchronous code. (Because it run other code while waiting.) Much lightweight. Since it's using time division method it can run even under CPU heavy workload. (By stopping workload briefly and executing other thread)
    • con: Cannot utilize Multiple core. Can't run more than 1 code at any given time. Control flows are hard to predict.

So, for any CPU intensive workload, it is better be parallelized.

For any IO bound workload (aka waiting), it is better coded asynchronously - because we don't need to utilize more cores anyway.

Code fixes

asyncio

You need to await something in cpu_intensive coroutine.

As shown on this SO post We can use yield asyncio.sleep(0) to add context switching point inside workload. Of course this is not desired way of writing asynchronous code, but if you need to attach such function to async code it's a way.

import asyncio
import time


async def cpu_intensive():
    print("cpu_intensive for 3 seconds")
    duration = 3
    close_time = time.time() + duration
    while True:
        if time.time() > close_time:
            break

        await asyncio.sleep(0)


async def keep_alive():
    while True:
        print("keep alive for 1 second")
        await asyncio.sleep(1)


async def main():
    print(f"Started at {time.strftime('%X')}")

    cpu_intensive_task = asyncio.create_task(cpu_intensive())
    asyncio.create_task(keep_alive())

    await cpu_intensive_task

    print(f"Finished at {time.strftime('%X')}")


asyncio.run(main())

"""
Started at 04:13:09
cpu_intensive for 3 seconds
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
Finished at 04:13:12
"""

There's one more keep alive because it check condition first then wait 1 seconds.

Do note asyncio.sleep is scheduling for event not actually is waiting accurate given time. Consider it like "Do whatever you want while I sleep, but just make sure to call me after X seconds."


P.S.

Later at one point, you'll realize the instability, hard error handling or inconsistency of asyncio and stumble upon to trio like I did, for that case I am leaving example for trio.

trio

import trio
import time


class TaskDoneException(Exception):
    pass


async def cpu_intensive():
    print("cpu_intensive for 3 seconds")
    duration = 3
    close_time = time.time() + duration
    while True:
        if time.time() > close_time:
            raise TaskDoneException()

        await trio.sleep(0)


async def keep_alive():
    while True:
        print("keep alive for 1 second")
        await trio.sleep(1)


async def main():
    try:
        async with trio.open_nursery() as nursery:

            print(f"Started at {time.strftime('%X')}")
            nursery.start_soon(cpu_intensive)
            nursery.start_soon(keep_alive)
    except TaskDoneException:
        print(f"Finished at {time.strftime('%X')}")


trio.run(main)


'''
Output:
Started at 17:43:45
keep alive for 1 second
cpu_intensive for 3 seconds
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
Finished at 17:43:48
'''

jupiterbjy
  • 2,882
  • 1
  • 10
  • 28
0

Have you perhaps tried using threading ? you can easily implement it like this.

import threading
import time
from datetime import datetime

def cpu_intensive():
    time.sleep(10) #this is the time consuming task
    print("CPU INTENSTIVE TASK DONE")

def keep_alive():
    now = datetime.now() # current date and time
    date_time = now.strftime("%m/%d/%Y, %H:%M:%S")
    print(f"{date_time}  -- BUSY STATUS / SIGNAL CODE")

#Starting the cpu_intensive task
thread = threading.Thread(target=cpu_intensive)
now = datetime.now() # current date and time
date_time = now.strftime("%m/%d/%Y, %H:%M:%S")
print("STARTING CPU PROCESS: ", date_time) 
thread.start()

# Doing something while the task is alive
while thread.is_alive():
    keep_alive()
    time.sleep(1)
thread.join()
print("TASK COMPLETE")

The output should look like this

STARTING CPU PROCESS:  11/18/2021, 14:53:05
11/18/2021, 14:53:05  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:06  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:07  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:08  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:09  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:10  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:11  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:12  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:13  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:14  -- BUSY STATUS / SIGNAL CODE
CPU INTENSTIVE TASK DONE
TASK COMPLETE

You can also switch the thread to hold the keep_alive as well.

Kurt Rojas
  • 319
  • 2
  • 12