4

First of all, I am new to Python and not familiar with its functionalities. I've been mainly using MATLAB.

PC brief spec.: Windows 10, Intel i7

I am trying to make a timer class for periodic execution of a function such as MATLAB has, which is obviously borrowed from Java timer. The MATLAB timer has an about 1 ms resolution and I've never seen it exceeds 2 ms in any situation. In fact, it is accurate enough for my project.

Recently, I planned to move to Python because of the poor parallel computing and web access features of MATLAB. However, unfortunately, the standard packages of Python offer somewhat low-level of timer (threading.Timer) compared to MATLAB that I had to make my own timer class. First, I referred to the QnA Executing periodic actions in Python [duplicate]. The solution suggested by Michael Anderson gives a simple idea of drift correction. He used time.sleep() to keep the period. The approach is highly accurate and sometimes showed better accuracy over the MATLAB timer. approx. 0.5 ms resolution. However, the timer cannot be interrupted (pause or resume) during being captured in time.sleep(). But I sometimes have to stop immediately regardless of whether it is in sleep() or not.

A solution to the problem I found is to utilize the Event class in threading package. Refer to Python threading.timer - repeat function every 'n' seconds . Using the timeout feature of Event.wait(), I could make a time gap between executions and it is used to keep the period. That is, the event is usually cleared so that wait(timeout) can act like time.sleep(interval) and I could exit from wait() immediately, when needed, by setting event.

Everything seemed fine then but there is a critical problem in Event.wait(). The time delay varies too largely from 1 ~ 15 ms. I think it comes from the overhead of Event.wait().

I made an example code that shows accuracy comparison between time.sleep() and Event.wait(). This sums total of 1000 iterations of 1 ms sleep() and wait() to see the accumulated time error. The expected result is about 1.000.

import time
from threading import Event

time.sleep(3)  # to relax

# time.sleep()
tspan = 1
N = 1000
t1 = time.perf_counter()
for _ in range(N):
    time.sleep(tspan/N)
t2 = time.perf_counter()

print(t2-t1)

time.sleep(3)  # to relax

# Event.wait()    
tspan = 1
event = Event()
t1 = time.perf_counter()
for _ in range(N):
    event.wait(tspan/N)
t2 = time.perf_counter()

print(t2-t1)

Result:

1.1379848184879964
15.614547161211096

The result shows that time.sleep() is much better in accuracy. But I cannot purely rely on time.sleep() as previously mentioned.

In summary,

  • time.sleep(): accurate but not interruptible
  • threading.Event.wait(): inaccurate but interruptible

I am currently thinking of a compromise: just as in the example, make a loop of tiny time.sleep() (of 0.5 ms interval) and exit the loop using if-statement and break when needed. As far as I know, the method is used in Python 2.x Python time.sleep() vs event.wait().

It was a verbose introduction, but my question can be summarized as follows.

  1. Can I force thread process to break from time.sleep() by an external signal or event? (This seems to be most efficient.???)

  2. To make Event.wait() more accurate or to reduce overhead time.

  3. Are there any better approaches aside of sleep() and Event.wait() approach to improve timing precision.

Thank you very much.

Hermis14
  • 214
  • 3
  • 12

3 Answers3

0

I ran into the same timing issue with Event.wait(). The solution I came up with was to create a class which mimics threading.Event. Internally, it uses a combination of a time.sleep() loop and a busy loop for greatly increased precision. The sleep loop runs in a separate thread so that the blocking wait() call in the main thread can still be immediately interrupted. When the set() method is called, the sleep thread should terminate shortly afterwards. Also, in order to minimize CPU utilization, I made sure that the busy loop will never run for more than 3 milliseconds.

Here is my custom Event class along with a timing demo at the end (the printed execution times from the demo will be in nanoseconds):

import time
import _thread
import datetime


class Event:
    __slots__ = (
        "_flag", "_lock", "_nl",
        "_pc", "_waiters"
    )

    _lock_type = _thread.LockType
    _timedelta = datetime.timedelta
    _perf_counter = time.perf_counter
    _new_lock = _thread.allocate_lock

    class _switch:
        __slots__ = ("_on",)

        def __call__(self, on: bool = None):
            if on is None:
                return self._on

            self._on = on

        def __bool__(self):
            return self._on

        def __init__(self):
            self._on = False

    def clear(self):
        with self._lock:
            self._flag(False)

    def is_set(self) -> bool:
        return self._flag()

    def set(self):
        with self._lock:
            self._flag(True)
            waiters = self._waiters

            for waiter in waiters:
                waiter.release()

            waiters.clear()

    def wait(
        self,
        timeout: float = None
    ) -> bool:
        with self._lock:
            return self._wait(self._pc(), timeout)

    def _new_waiter(self) -> _lock_type:
        waiter = self._nl()
        waiter.acquire()
        self._waiters.append(waiter)
        return waiter

    def _wait(
        self,
        start: float,
        timeout: float,
        td=_timedelta,
        pc=_perf_counter,
        end: _timedelta = None,
        waiter: _lock_type = None,
        new_thread=_thread.start_new_thread,
        thread_delay=_timedelta(milliseconds=3)
    ) -> bool:
        flag = self._flag

        if flag:
            return True
        elif timeout is None:
            waiter = self._new_waiter()
        elif timeout <= 0:
            return False
        else:
            delay = td(seconds=timeout)
            end = td(seconds=start) + delay

            if delay > thread_delay:
                mark = end - thread_delay
                waiter = self._new_waiter()
                new_thread(
                    self._wait_thread,
                    (flag, mark, waiter)
                )

        lock = self._lock
        lock.release()

        try:
            if waiter:
                waiter.acquire()

            if end:
                while (
                    not flag and
                    td(seconds=pc()) < end
                ):
                    pass

        finally:
            lock.acquire()

            if waiter and not flag:
                self._waiters.remove(waiter)

        return flag()

    @staticmethod
    def _wait_thread(
        flag: _switch,
        mark: _timedelta,
        waiter: _lock_type,
        td=_timedelta,
        pc=_perf_counter,
        sleep=time.sleep
    ):
        while not flag and td(seconds=pc()) < mark:
            sleep(0.001)

        if waiter.locked():
            waiter.release()

    def __new__(cls):
        _new_lock = cls._new_lock
        _self = object.__new__(cls)
        _self._waiters = []
        _self._nl = _new_lock
        _self._lock = _new_lock()
        _self._flag = cls._switch()
        _self._pc = cls._perf_counter
        return _self


if __name__ == "__main__":
    def test_wait_time():
        wait_time = datetime.timedelta(microseconds=1)
        wait_time = wait_time.total_seconds()

        def test(
            event=Event(),
            delay=wait_time,
            pc=time.perf_counter
        ):
            pc1 = pc()
            event.wait(delay)
            pc2 = pc()
            pc1, pc2 = [
                int(nbr * 1000000000)
                for nbr in (pc1, pc2)
            ]
            return pc2 - pc1

        lst = [
            f"{i}.\t\t{test()}"
            for i in range(1, 11)
        ]
        print("\n".join(lst))

    test_wait_time()
    del test_wait_time
Chris D
  • 351
  • 1
  • 13
0

Chris D's custom Event class works impressively well! For practical purposes, I have included it into an installable package (https://github.com/ovinc/oclock, install with pip install oclock) that also includes other timing tools. From version 1.3.0 of oclock and onwards, one can use the custom Event class discussed in Chris D's answer, e.g.

from oclock import Event
event = Event()
event.wait(1)

with the usual set(), clear(), is_set(), wait() methods of the Event class.

The timing accuracy is much better than with threading.Event, in Windows in particular. For example on a Windows machine with 1000 repeated loops, I get a standard deviation in the duration of the loop of 7ms for threading.Event and less than 0.01 ms for oclock.Event. Props to Chris D!

Note: The oclock package is under the GPLv3 license for compatibility with StackOverflow's CC BY-SA 4.0.

0

Thank you for this topic and all answers. I also experienced some troubles with inaccurate timing (Windows 10 + Python 3.9 + Threading).

The solution is to use oclock package and also change (temporarily) resolution of Windows system timer by wres package. This package utilizes undocumented Windows API function NtSetTimerResolution (warning: resolution is changed system-wide).

Application of oclock package only does not solve the problem.

With both python packages applied, the code below schedules periodic event correctly and precisely enough. If terminated, original timer resolution is restored.

import threading
import datetime
import time
import oclock
import wres

class Job(threading.Thread):
    def __init__(self, interval, *args, **kwargs):
        threading.Thread.__init__(self)
        # use oclock.Event() instead of threading.Event()
        self.stopped = oclock.Event()
        self.interval = interval.total_seconds()
        self.args = args
        self.kwargs = kwargs

    def stop(self):
        self.stopped.set()
        self.join()

    def run(self):
        prevTime = time.time()
        while not self.stopped.wait(self.interval):
            now = time.time()
            print(now - prevTime)
            prevTime = now

# Set system timer resolution to 1 ms
# Automatically restore previous resolution when exit with statement
with wres.set_resolution(10000):
    # Create thread with periodic task called every 10 ms
    job = Job(interval=datetime.timedelta(seconds=0.010))
    job.start()

    try:
        while True:
            time.sleep(1)
    # Hit Ctrl+C to terminate main loop and spawned thread
    except KeyboardInterrupt:
        job.stop()
onvav
  • 11
  • 2