In most of the asynchronous coroutines I write, I only need to replace the function definition def func()
-> async def func()
and the sleep time.sleep(s)
-> await asyncio.sleep(s)
.
Is it possible to convert a standard python function into an async function where all the time.sleep(s)
is converted to await asyncio.sleep(s)
?
Example
Performance during task
Measure performance during a task
import asyncio
import random
async def performance_during_task(task):
stop_event = asyncio.Event()
target_task = asyncio.create_task(task(stop_event))
perf_task = asyncio.create_task(measure_performance(stop_event))
await target_task
await perf_task
async def measure_performance(event):
while not event.is_set():
print('Performance: ', random.random())
await asyncio.sleep(.2)
if __name__ == "__main__":
asyncio.run(
performance_during_task(task)
)
Task
The task has to be defined with async def
and await asyncio.sleep(s)
async def task(event):
for i in range(10):
print('Step: ', i)
await asyncio.sleep(.2)
event.set()
into ->
Easy task definition
To have others not worrying about async etc. I want them to be able to define a task normally (e.g. with a decorator?)
@as_async
def easy_task(event):
for i in range(10):
print('Step: ', i)
time.sleep(.2)
event.set()
So that it can be used as an async function with e.g. performance_during_task()