107

I'm migrating from tornado to asyncio, and I can't find the asyncio equivalent of tornado's PeriodicCallback. (A PeriodicCallback takes two arguments: the function to run and the number of milliseconds between calls.)

  • Is there such an equivalent in asyncio?
  • If not, what would be the cleanest way to implement this without running the risk of getting a RecursionError after a while?
2Cubed
  • 3,401
  • 7
  • 23
  • 40

9 Answers9

95

For Python versions below 3.5:

import asyncio

@asyncio.coroutine
def periodic():
    while True:
        print('periodic')
        yield from asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

For Python 3.5 and above:

import asyncio

async def periodic():
    while True:
        print('periodic')
        await asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass
Lonami
  • 5,945
  • 2
  • 20
  • 38
A. Jesse Jiryu Davis
  • 23,641
  • 4
  • 57
  • 70
  • 6
    Even in Tornado, I'd recommend a loop like this instead of a `PeriodicCallback` for applications that make use of coroutines. – Ben Darnell May 29 '16 at 18:17
  • 9
    Just a quick note: Don’t directly create `Task` instances; use the `ensure_future()` function or the `AbstractEventLoop.create_task()` method. From the [asyncio documentation](https://docs.python.org/3.5/library/asyncio-task.html#asyncio.Task). – Torkel Bjørnson-Langen Jan 05 '17 at 22:38
  • A lambda may be used instead instead of the `stop` function. I.e.: `loop.call_later(5, lambda: task.cancel())` – Torkel Bjørnson-Langen Jan 05 '17 at 22:41
  • 30
    Or you can just call it like `loop.call_later(5, task.cancel)`. – ReWrite Jan 19 '17 at 01:32
  • 5
    Just a note for Python 3.7: From the [asyncio doc](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task), we should use the high-level `asyncio.create_task()` to create `Task`s. – mhchia Jan 23 '19 at 08:37
  • How come you `asyncio.sleep` and `loop.call_later`? Is this to prevent a `RecursionError`? – 8bitme Jul 26 '19 at 06:13
  • You can also use `while await asyncio.sleep(n, result=True): ...` to wait at the beginning of the loop. – jirassimok Nov 16 '19 at 01:45
39

When you feel that something should happen "in background" of your asyncio program, asyncio.Task might be good way to do it. You can read this post to see how to work with tasks.

Here's possible implementation of class that executes some function periodically:

import asyncio
from contextlib import suppress


class Periodic:
    def __init__(self, func, time):
        self.func = func
        self.time = time
        self.is_started = False
        self._task = None

    async def start(self):
        if not self.is_started:
            self.is_started = True
            # Start task to call func periodically:
            self._task = asyncio.ensure_future(self._run())

    async def stop(self):
        if self.is_started:
            self.is_started = False
            # Stop task and await it stopped:
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task

    async def _run(self):
        while True:
            await asyncio.sleep(self.time)
            self.func()

Let's test it:

async def main():
    p = Periodic(lambda: print('test'), 1)
    try:
        print('Start')
        await p.start()
        await asyncio.sleep(3.1)

        print('Stop')
        await p.stop()
        await asyncio.sleep(3.1)

        print('Start')
        await p.start()
        await asyncio.sleep(3.1)
    finally:
        await p.stop()  # we should stop task finally


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

Start
test
test
test

Stop

Start
test
test
test

[Finished in 9.5s]

As you see on start we just start task that calls some functions and sleeps some time in endless loop. On stop we just cancel that task. Note, that task should be stopped at the moment program finished.

One more important thing that your callback shouldn't take much time to be executed (or it'll freeze your event loop). If you're planning to call some long-running func, you possibly would need to run it in executor.

Community
  • 1
  • 1
Mikhail Gerasimov
  • 36,989
  • 16
  • 116
  • 159
  • The most complete and clear answer so far! Thanks. Is it good idea to require the `func` to be a coroutine, so we can: `await self.func()` in the `_run` method? – Sergey Belash Jan 30 '17 at 07:23
  • 1
    @SergeyBelash, sure, it'll be ok. Note only that since we cancel task at random time, your func may be also cancelled at random time. It means every await line inside your function can potentially raise CancelledError. But it's actual for every async function at all (just like KeyboardInterrupt can be raised randomly in regular non-async code). – Mikhail Gerasimov Jan 30 '17 at 08:02
  • I worry with this (and other answers) that the repeat rate won't be exactly the time value. If func takes an appreciable time to execute it won't even be close, and over a long period it will drift even if func takes negligible time. – Ian Goldby Dec 04 '17 at 11:29
  • Strictly speaking, `start()` does not need to be `async`. – fgiraldeau Mar 09 '20 at 16:07
  • This can be upgraded to support both normal and async functions: ``` async def _run(self): while True: await asyncio.sleep(self.time) # Supporting normal and async functions res = self.func() if inspect.isawaitable(res): await res ``` – Airstriker Dec 31 '20 at 00:43
31

A variant that may be helpful: if you want your recurring call to happen every n seconds instead of n seconds between the end of the last execution and the beginning of the next, and you don't want calls to overlap in time, the following is simpler:

async def repeat(interval, func, *args, **kwargs):
    """Run func every interval seconds.

    If func has not finished before *interval*, will run again
    immediately when the previous iteration finished.

    *args and **kwargs are passed as the arguments to func.
    """
    while True:
        await asyncio.gather(
            func(*args, **kwargs),
            asyncio.sleep(interval),
        )

And an example of using it to run a couple tasks in the background:

async def f():
    await asyncio.sleep(1)
    print('Hello')


async def g():
    await asyncio.sleep(0.5)
    print('Goodbye')


async def main():
    t1 = asyncio.ensure_future(repeat(3, f))
    t2 = asyncio.ensure_future(repeat(2, g))
    await t1
    await t2

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Fred Ross
  • 494
  • 5
  • 2
  • Thank you! I had this problem while my server was under heavy load, and over many repetitions we started to get clock skew. This solves it elegantly. – Christian Oudard May 06 '20 at 14:41
  • 2
    Why do you use ensure_future in main()? Why not simply `await repeat(3, f)` and `await repeat(2, g)`? – marcoc88 Oct 21 '20 at 11:37
  • what if you want f or g to return a value? – eugene Jan 02 '22 at 16:29
  • @marcoc88 if you simply await repeat, the rest of the code will be executed once the first repeat ends. – Burak Yildiz Aug 14 '23 at 22:08
  • @eugene you can do it with an extra callback function passed to the repeat function. After each asyncio.gather finishes, it will return a list of return values. The first element of the list will be the return value of the func. Then you can simply call callback with that value. – Burak Yildiz Aug 14 '23 at 22:17
28

There is no built-in support for periodic calls, no.

Just create your own scheduler loop that sleeps and executes any tasks scheduled:

import math, time

async def scheduler():
    while True:
        # sleep until the next whole second
        now = time.time()
        await asyncio.sleep(math.ceil(now) - now)
       
        # execute any scheduled tasks
        async for task in scheduled_tasks(time.time()):
            await task()

The scheduled_tasks() iterator should produce tasks that are ready to be run at the given time. Note that producing the schedule and kicking off all the tasks could in theory take longer than 1 second; the idea here is that the scheduler yields all tasks that should have started since the last check.

Pynchia
  • 10,996
  • 5
  • 34
  • 43
Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
  • The `asyncio` event loop has a `time()` method that could be used in place of the `time` module. – krs013 May 26 '17 at 04:34
  • 4
    @krs013: That's a *different clock*; it doesn't necessarily give you real-world time (it depends on the event loop implementation, and can measure CPU time ticks or another monotonically increasing clock measure). Because it is not guaranteed to provide a measure in seconds, it should **not** be used here. – Martijn Pieters May 26 '17 at 06:29
  • Oh, good point, thanks. I figured that it would be good enough for interval timing, but it looks like no guarantee is made for accuracy in sleeping threads. The implementations I've seen seem to just use the machines uptime in nanoseconds, but yeah, you're right. I think I have some code to fix now... – krs013 May 26 '17 at 06:37
  • The [docstring](https://github.com/python/cpython/blob/e5d67f1e31381d28b24f6e1c0f8388d9bf0bfc5f/Lib/asyncio/base_events.py#L626-L633) of the `loop.time` method states "This is a float expressed in seconds since an epoch, but the epoch, precision, accuracy and drift are unspecified and may differ per event loop." Here I interpret this as "SI seconds since an epoch" therefore CPU time ticks, or other non "uniform" clocks do not qualify as valid for `loop.time()`. Since the OP just asked for a periodic callback every x milliseconds, it seems to me that `loop.time()` is adequate for the purpose. – Stefano M Aug 30 '19 at 21:35
  • @StefanoM: yes, it *may* be adequate, but is event-loop-implementation dependent and the docstring gives implementations plenty of leeway. It may be good enough for repeating tasks, but my answer describers a *scheduler*, which often needs to do cron-like things (e.g. run tasks at specific real-world times). – Martijn Pieters Aug 31 '19 at 00:40
  • @ChristianOudard: same response for you: `time.monotic()` is great for measuring elapsed time, not for scheduling items against a wall-clock time value. – Martijn Pieters May 06 '20 at 15:21
  • I would recommend the APSScheduler lib with asyncio support – jmoz Mar 18 '22 at 04:20
13

Alternative version with decorator for python 3.7

import asyncio
import time


def periodic(period):
    def scheduler(fcn):

        async def wrapper(*args, **kwargs):

            while True:
                asyncio.create_task(fcn(*args, **kwargs))
                await asyncio.sleep(period)

        return wrapper

    return scheduler


@periodic(2)
async def do_something(*args, **kwargs):
    await asyncio.sleep(5)  # Do some heavy calculation
    print(time.time())


if __name__ == '__main__':
    asyncio.run(do_something('Maluzinha do papai!', secret=42))
6

Based on @A. Jesse Jiryu Davis answer (with @Torkel Bjørnson-Langen and @ReWrite comments) this is an improvement which avoids drift.

import time
import asyncio

@asyncio.coroutine
def periodic(period):
    def g_tick():
        t = time.time()
        count = 0
        while True:
            count += 1
            yield max(t + count * period - time.time(), 0)
    g = g_tick()

    while True:
        print('periodic', time.time())
        yield from asyncio.sleep(next(g))

loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass
Neuron
  • 5,141
  • 5
  • 38
  • 59
Wojciech Migda
  • 738
  • 7
  • 16
  • 2
    `periodic` should probably use `loop.time()` in preference to `time.time()` because `loop.time()` is the time reference internally used by `asyncio.sleep()`. `loop.time()` returns monotonic time, while `time.time()` returns wallclock time. The two will differ e.g. when a system administrator modifies the date on the system, or when NTP adjusts wallclock time. – user4815162342 Feb 04 '18 at 21:29
  • this answer needs an update for python 3.10, i dont think anyone does @asyncio.couroutine or loop.run_until_complete in the newer python versions anymore – PirateApp Aug 29 '22 at 08:17
  • @user4815162342 mind sharing where is this loop.time i tried looking it up and got nothing and tried from asyncio import loop and from asyncio.loop import time, none of those work – PirateApp Aug 29 '22 at 10:59
  • 1
    @PirateApp Use `loop = asyncio.get_event_loop()`. – user4815162342 Aug 29 '22 at 11:44
2

This solution uses the decoration concept from Fernando José Esteves de Souza, the drifting workaround from Wojciech Migda and a superclass in order to generate most elegant code as possible to deal with asynchronous periodic functions.

Without threading.Thread

The solution is comprised of the following files:

  • periodic_async_thread.py with the base class for you to subclass
  • a_periodic_thread.py with an example subclass
  • run_me.py with an example instantiation and run

The PeriodicAsyncThread class in the file periodic_async_thread.py:

import time
import asyncio
import abc

class PeriodicAsyncThread:
    def __init__(self, period):
        self.period = period

    def periodic(self):
        def scheduler(fcn):
            async def wrapper(*args, **kwargs):
                def g_tick():
                    t = time.time()
                    count = 0
                    while True:
                        count += 1
                        yield max(t + count * self.period - time.time(), 0)
                g = g_tick()

                while True:
                    # print('periodic', time.time())
                    asyncio.create_task(fcn(*args, **kwargs))
                    await asyncio.sleep(next(g))
            return wrapper
        return scheduler

    @abc.abstractmethod
    async def run(self, *args, **kwargs):
        return

    def start(self):
        asyncio.run(self.run())

An example of a simple subclass APeriodicThread in the file a_periodic_thread.py:

from periodic_async_thread import PeriodicAsyncThread
import time
import asyncio

class APeriodicThread(PeriodicAsyncThread):
    def __init__(self, period):
        super().__init__(period)
        self.run = self.periodic()(self.run)
    
    async def run(self, *args, **kwargs):
        await asyncio.sleep(2)
        print(time.time())

Instantiating and running the example class in the file run_me.py:

from a_periodic_thread import APeriodicThread
apt = APeriodicThread(2)
apt.start()

This code represents an elegant solution that also mitigates the time drift problem of other solutions. The output is similar to:

1642711285.3898764
1642711287.390698
1642711289.3924973
1642711291.3920736

With threading.Thread

The solution is comprised of the following files:

  • async_thread.py with the canopy asynchronous thread class.
  • periodic_async_thread.py with the base class for you to subclass
  • a_periodic_thread.py with an example subclass
  • run_me.py with an example instantiation and run

The AsyncThread class in the file async_thread.py:

from threading import Thread
import asyncio
import abc

class AsyncThread(Thread):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

    @abc.abstractmethod
    async def async_run(self, *args, **kwargs):
        pass

    def run(self, *args, **kwargs):
        # loop = asyncio.new_event_loop()
        # asyncio.set_event_loop(loop)

        # loop.run_until_complete(self.async_run(*args, **kwargs))
        # loop.close()
        asyncio.run(self.async_run(*args, **kwargs))

The PeriodicAsyncThread class in the file periodic_async_thread.py:

import time
import asyncio
from .async_thread import AsyncThread

class PeriodicAsyncThread(AsyncThread):
    def __init__(self, period, *args, **kwargs):
        self.period = period
        super().__init__(*args, **kwargs)
        self.async_run = self.periodic()(self.async_run)

    def periodic(self):
        def scheduler(fcn):
            async def wrapper(*args, **kwargs):
                def g_tick():
                    t = time.time()
                    count = 0
                    while True:
                        count += 1
                        yield max(t + count * self.period - time.time(), 0)
                g = g_tick()

                while True:
                    # print('periodic', time.time())
                    asyncio.create_task(fcn(*args, **kwargs))
                    await asyncio.sleep(next(g))
            return wrapper
        return scheduler

An example of a simple subclass APeriodicThread in the file a_periodic_thread.py:

import time
from threading import current_thread
from .periodic_async_thread import PeriodicAsyncThread
import asyncio

class APeriodicAsyncTHread(PeriodicAsyncThread):
    async def async_run(self, *args, **kwargs):
        print(f"{current_thread().name} {time.time()} Hi!")
        await asyncio.sleep(1)
        print(f"{current_thread().name} {time.time()} Bye!")

Instantiating and running the example class in the file run_me.py:

from .a_periodic_thread import APeriodicAsyncTHread
a = APeriodicAsyncTHread(2, name = "a periodic async thread")
a.start()
a.join()

This code represents an elegant solution that also mitigates the time drift problem of other solutions. The output is similar to:

a periodic async thread 1643726990.505269 Hi!
a periodic async thread 1643726991.5069854 Bye!
a periodic async thread 1643726992.506919 Hi!
a periodic async thread 1643726993.5089169 Bye!
a periodic async thread 1643726994.5076022 Hi!
a periodic async thread 1643726995.509422 Bye!
a periodic async thread 1643726996.5075526 Hi!
a periodic async thread 1643726997.5093904 Bye!
a periodic async thread 1643726998.5072556 Hi!
a periodic async thread 1643726999.5091035 Bye!
DsAtHuh
  • 21
  • 2
1

For multiple types of scheduling I'd recommend APSScheduler which has asyncio support.

I use it for a simple python process I can fire up using docker and just runs like a cron executing something weekly, until I kill the docker/process.

alecxe
  • 462,703
  • 120
  • 1,088
  • 1,195
jmoz
  • 7,846
  • 5
  • 31
  • 33
0

This is what I did to test my theory of periodic call backs using asyncio. I don't have experience using Tornado, so I'm not sure exactly how the periodic call backs work with it. I am used to using the after(ms, callback) method in Tkinter though, and this is what I came up with. While True: Just looks ugly to me even if it is asynchronous (more so than globals). The call_later(s, callback, *args) method uses seconds not milliseconds though.

import asyncio
my_var = 0
def update_forever(the_loop):
    global my_var
    print(my_var)
    my_var += 1 
    # exit logic could be placed here
    the_loop.call_later(3, update_forever, the_loop)  # the method adds a delayed callback on completion

event_loop = asyncio.get_event_loop()
event_loop.call_soon(update_forever, event_loop)
event_loop.run_forever()
ShayneLoyd
  • 633
  • 1
  • 4
  • 9