11

I would like to implement a simple watchdog timer in Python with two use cases:

  • Watchdog ensures that a function doesn't execute longer than x seconds
  • Watchdog ensures that certain regularly executed function does indeed execute at least every y seconds

How do I do that?

Sergiy Belozorov
  • 5,856
  • 7
  • 40
  • 73
  • Here's [`WatchdogTimer` implementation that creates only one thread](https://stackoverflow.com/a/34115590/4279) – jfs Aug 30 '17 at 17:36

3 Answers3

13

Just publishing my own solution to this:

from threading import Timer

class Watchdog(Exception):
    def __init__(self, timeout, userHandler=None):  # timeout in seconds
        self.timeout = timeout
        self.handler = userHandler if userHandler is not None else self.defaultHandler
        self.timer = Timer(self.timeout, self.handler)
        self.timer.start()

    def reset(self):
        self.timer.cancel()
        self.timer = Timer(self.timeout, self.handler)
        self.timer.start()

    def stop(self):
        self.timer.cancel()

    def defaultHandler(self):
        raise self

Usage if you want to make sure function finishes in less than x seconds:

watchdog = Watchdog(x)
try:
  # do something that might take too long
except Watchdog:
  # handle watchdog error
watchdog.stop()

Usage if you regularly execute something and want to make sure it is executed at least every y seconds:

import sys

def myHandler():
  print "Whoa! Watchdog expired. Holy heavens!"
  sys.exit()

watchdog = Watchdog(y, myHandler)

def doSomethingRegularly():
  # make sure you do not return in here or call watchdog.reset() before returning
  watchdog.reset()
thorwhalen
  • 1,920
  • 14
  • 26
Sergiy Belozorov
  • 5,856
  • 7
  • 40
  • 73
  • Ever heard of PEP8? And what is `Timer()` here? – Zaur Nasibov Apr 22 '13 at 13:49
  • 1
    Nope, haven't heard about it. I am beginner. But I will check once I have more time for this... – Sergiy Belozorov Apr 22 '13 at 13:51
  • 1
    I'm not sure about your `reset` method. In your `__init__` your `Timer` switches based on if `userHandler` (which should be `is not None` BTW) is supplied, in which case it's used. In `reset` - it's always `self.handler` that will then be used... – Jon Clements Apr 22 '13 at 13:54
  • applied PEP8, fixed bug in reset. Thanks. – Sergiy Belozorov Apr 22 '13 at 13:59
  • There is also the [`signal module`](http://docs.python.org/2/library/signal.html#example) and [`signal.alarm`](http://docs.python.org/2/library/signal.html#signal.alarm) could be used for a timer. – JamesThomasMoon Nov 09 '13 at 04:03
  • 4
    I think the `__init__` function and `reset` function need to call [`self.timer.start()`](http://docs.python.org/2/library/threading.html#timer-objects). – JamesThomasMoon Nov 09 '13 at 04:28
  • [This answer for handling timeout](https://stackoverflow.com/a/22348885/758174) is better (and can be used in a `with` context). – Pierre D Dec 03 '18 at 15:29
  • 4
    This `Watchdog` raises the `Exception` (itself) in a separate thread (the `Timer` thread), so the `try`/`except` in the usage example is useless. The `Watchdog` also does not interrupt the long operation, which would be the point of a watchdog. – rtoijala Feb 22 '21 at 07:07
  • Confirmed @rtoijala comment. This should not be the selected answer since it's wrong. – Travis Oct 20 '21 at 15:05
2

signal.alarm() sets a timeout for your program, and you can call it in your main loop, and set it to the greater of the two times you are prepared to tolerate:

import signal
while True:
    signal.alarm(10)
    infloop()
Des Cent
  • 111
  • 6
0

Here is a wdt I use in my app without class. It has no way to stop it:

from threading import Event, Thread

def wdt(time, callback):
    # a reset flag
    reset_e = Event()
    # a function to reset the wdt
    def reset(): reset_e.set()
    # the function to run in a differen thread
    def checker():
        # check if reset flag is set.
        # wait for specified time to give chance to reset.
        while reset_e.wait(time):
            # it was set in time. clear and wait again
            reset_e.clear()
        # time run out.
        callback()
    # the event is not set by default. Set it
    reset()
    # create and start the wdt
    t = Thread(target=checker)
    t.start()
    # return the resetter
    return reset


# The callback to run if wdt is not reset
def bark():
    print('woof')       

# Test
import time
reset = wdt(1.0, bark)
time.sleep(0.9)
reset()
time.sleep(2.0)
Jolbas
  • 757
  • 5
  • 15