500

I'm calling a function in Python which I know may stall and force me to restart the script.

How do I call the function or what do I wrap it in so that if it takes longer than 5 seconds the script cancels it and does something else?

martineau
  • 119,623
  • 25
  • 170
  • 301
Teifion
  • 108,121
  • 75
  • 161
  • 195

24 Answers24

335

You may use the signal package if you are running on UNIX:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10 seconds after the call signal.alarm(10), the handler is called. This raises an exception that you can intercept from the regular Python code.

This module doesn't play well with threads (but then, who does?)

Note that since we raise an exception when timeout happens, it may end up caught and ignored inside the function, for example of one such function:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue
sweden
  • 110
  • 8
piro
  • 13,378
  • 5
  • 34
  • 38
  • 5
    I use Python 2.5.4. There is such an error: Traceback (most recent call last): File "aa.py", line 85, in func signal.signal(signal.SIGALRM, handler) AttributeError: 'module' object has no attribute 'SIGALRM' – flypen May 13 '11 at 01:59
  • 19
    @flypen that's because `signal.alarm` and the related `SIGALRM` are not available on Windows platforms. – Double AA Aug 19 '11 at 16:20
  • 4
    If there are a lot of processes, and each calls `signal.signal` --- will they all work properly? Won't each `signal.signal` call cancel "concurrent" one? – brownian May 10 '12 at 08:28
  • 1
    Warning for those wishing to use this with a C extension: The Python signal handler won't be called until the C function returns control to the Python interpreter. For this use case, use ATOzTOA's answer: http://stackoverflow.com/a/14924210/1286628 – wkschwartz Feb 20 '14 at 20:25
  • 24
    I second the warning about threads. signal.alarm only works on main thread. I tried to use this in Django views - immediate fail with verbiage about main thread only. – JL Peyret Apr 02 '15 at 06:51
  • 1
    I want to set a timeout of 0.25 seconds. But `signal.alarm()` only takes an integer. How do I do this? – mjsxbo Dec 21 '17 at 07:05
  • For me this didn't work, my code was hanging during the connect to an external website. Also maybe because I used it in Django command, but anyway the `multiprocessing` answer below was much better – The Godfather May 04 '18 at 07:34
  • This code did not work for me when I used a C++ extension in my code and when it's the C++ part that is hanging. – yuqli Apr 29 '19 at 03:04
  • 1
    What should I use if I'm on Windows? – alexdriedger Oct 21 '19 at 14:05
  • This has the side-effect of the timeout being invokable from outside the program, i.e. from the command line, by issuing a SIGALRM to the running process. **Probably** that's okay in most cases, but absolutely not if a premature timeout has security implications, for example on I/O locks and file handles. – Teekin Dec 25 '19 at 15:40
  • 2
    If you need this: set the alarm back to 0 to cancel it `signal.alarm(0)` (see https://stackoverflow.com/questions/27013127/stop-sigalrm-when-function-returns). – Michele Piccolini Jun 26 '20 at 14:45
  • maybe this is expected behavior.. but it took a while for me to understand it - if running this in a jupyter environment `signal.alarm(10)` will reset your kernel after the 10 seconds – epifanio Apr 12 '21 at 14:37
  • 1
    Please specify this will work only on linux systems, as windows doesn't have the `signal.SIGALMR` signal. At least, this is what happened to me. A good alternative to thjis which works on all systems is to use `multiprocessing` – luigi Aug 12 '21 at 09:27
  • signal only work in main thread – IMXQD Sep 18 '21 at 03:32
232

You can use multiprocessing.Process to do exactly that.

Code

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()

        p.join()
Emil
  • 629
  • 2
  • 7
  • 24
ATOzTOA
  • 34,814
  • 22
  • 96
  • 117
  • 70
    How can I get the return value of the target method ? – bad_keypoints Aug 11 '15 at 07:05
  • 9
    This doesn't seem to work if the called function gets stuck on an I/O block. – sudo Jul 29 '16 at 18:35
  • 4
    @bad_keypoints See this answer: http://stackoverflow.com/a/10415215/1384471 Basically, you pass a list along that you put the answer into. – Peter Dec 15 '16 at 10:19
  • 1
    @sudo then remove the `join()`. that makes your x number of concurrent subprocesses being running untill them finish their work, or amount defined in `join(10)`. Case you have a blocking I/O for 10 processes, using join(10) you have set them to wait all of them max 10 for EACH process that has started. Use daemon flag like this example http://stackoverflow.com/a/27420072/2480481. Of course u can pass flag `daemon=True` directly to `multiprocessing.Process()` function. – m3nda Jan 02 '17 at 11:35
  • 4
    @ATOzTOA the problem with this solution, at least for my purposes, is that it potentially does not allow children treads to clean after themselves. From documentation of terminate function `terminate() ... Note that exit handlers and finally clauses, etc., will not be executed. Note that descendant processes of the process will not be terminated – they will simply become orphaned.` – abalcerek May 10 '17 at 14:03
  • 1
    If your `bar` accepts some args, you may use `args` argument for Process constructor. – The Godfather May 04 '18 at 07:35
  • Is my assumption correct that this can't be done with a thread instead of a process using `threading.Thread.join` since threads can't be killed in Python? https://eli.thegreenplace.net/2011/08/22/how-not-to-set-a-timeout-on-a-computation-in-python – jakob.j Jul 20 '18 at 14:04
  • Is it possible to use coroutines to achieve the same thing? – ruxtain Aug 17 '18 at 04:59
  • @bad_keypoints I am guessing you are looking for this https://docs.python.org/3.5/library/multiprocessing.html?highlight=process#multiprocessing.Process.exitcode you can look at p.exitcode to see if process timed out or exited normally. – Can Kavaklıoğlu Apr 16 '19 at 19:13
  • Thanks for an answer that works on more than just Unix! – Nathaniel Jones Apr 22 '19 at 21:00
  • why there is a p.join() after the termination? – yabchexu Oct 08 '19 at 01:48
  • @yabchexu `p.join()` will wait for the terminate to be complete before proceeding. – ATOzTOA Oct 08 '19 at 13:24
  • If you want to get information from the function while it's running (you don't just want an "exit code"), look into [shared data](https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes) (especially [managers](https://stackoverflow.com/a/10415215/5049813)) as @Peter recommended. – Pro Q Dec 02 '19 at 03:35
  • More info from the official docs here... https://docs.python.org/3/library/multiprocessing.html#the-process-class – Josh Jul 01 '20 at 16:23
  • Why would you use `p.join(10)` instead of `time.sleep(10)`? – XYZT Sep 11 '21 at 19:55
  • @XYZT In `p.join(10)`, if the process finishes in 3 seconds, it returns. But `time.sleep(10)` will pause the program 10 seconds anyway... – S.B Dec 17 '22 at 07:44
  • The only one that worked, thanks, I had to use kill() though. – Iszotic Mar 06 '23 at 21:59
116

How do I call the function or what do I wrap it in so that if it takes longer than 5 seconds the script cancels it?

I posted a gist that solves this question/problem with a decorator and a threading.Timer. Here it is with a breakdown.

Imports and setups for compatibility

It was tested with Python 2 and 3. It should also work under Unix/Linux and Windows.

First the imports. These attempt to keep the code consistent regardless of the Python version:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Use version independent code:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Now we have imported our functionality from the standard library.

exit_after decorator

Next we need a function to terminate the main() from the child thread:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

And here is the decorator itself:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Usage

And here's the usage that directly answers your question about exiting after 5 seconds!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Demo:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

The second function call will not finish, instead the process should exit with a traceback!

KeyboardInterrupt does not always stop a sleeping thread

Note that sleep will not always be interrupted by a keyboard interrupt, on Python 2 on Windows, e.g.:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

nor is it likely to interrupt code running in extensions unless it explicitly checks for PyErr_CheckSignals(), see Cython, Python and KeyboardInterrupt ignored

I would avoid sleeping a thread more than a second, in any case - that's an eon in processor time.

How do I call the function or what do I wrap it in so that if it takes longer than 5 seconds the script cancels it and does something else?

To catch it and do something else, you can catch the KeyboardInterrupt.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
Community
  • 1
  • 1
Russia Must Remove Putin
  • 374,368
  • 89
  • 403
  • 331
  • I didn't read your whole post yet, but I just wondered: what if flush is 0? That would be interpreted as False in the if-statement underneath, right? – Koenraad van Duin Mar 17 '19 at 07:17
  • 3
    Why do I have to call `thread.interrupt_main()`, why can't I directly raise an exception? – Anirban Nag 'tintinmj' Jul 30 '19 at 20:54
  • 1
    Any thoughts on wrapping `multiprocessing.connection.Client` with this? - Trying to solve: https://stackoverflow.com/questions/57817955/how-do-i-add-a-timeout-to-multiprocessing-connection-client-in-python-3-7 – wwii Sep 08 '19 at 16:59
  • 1
    It hangs on `thread.interrupt_main()` when I try different function instead of countdown. For example I run a `subprocess()` inside the count which didn't terminated even the timer is completed, I had to press `^C` – alper Aug 06 '20 at 12:06
  • 1
    How do you stop just all processes but not raise an the error KeyboardInterrupt? – WJA Mar 04 '21 at 17:46
  • Can we rise Exception instead of `return outer` ? – alper Jan 01 '22 at 17:02
  • That wouldn't make any sense - our decorator would be broken if you do that. – Russia Must Remove Putin Jan 01 '22 at 17:34
66

I have a different proposal which is a pure function (with the same API as the threading suggestion) and seems to work fine (based on suggestions on this thread)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result
Alex
  • 41,580
  • 88
  • 260
  • 469
  • 3
    You should also restore the original signal handler. See http://stackoverflow.com/questions/492519/timeout-on-a-python-function-call/494273#comment8635219_494273 – Martin Konecny Jun 11 '13 at 15:21
  • 10
    One more note: The Unix signal method only works if you are applying it in the main thread. Applying it in a sub-thread throws an exception and will not work. – Martin Konecny Jun 12 '13 at 20:23
  • 13
    This is not the best solution because it only works on linux. – max Mar 13 '14 at 20:10
  • 29
    Max, not true - works on any POSIX-compliant unix. I think your comment should be more accurately, doesn't work on Windows. – Chris Johnson Nov 16 '15 at 19:41
  • 15
    You should avoid setting kwargs to an empty dict. A common Python gotcha is that default arguments on functions are mutable. So that dictionary will be shared across all calls to `timeout`. It is much better to set the default to `None` and, on the first line of the function, add `kwargs = kwargs or {}`. Args is okay because tuples are not mutable. – scottmrogowski Aug 12 '16 at 17:13
  • 1
    some gotchas i see... calling this from multi-threads would probably not work (which can be fixed by added a mutex)... however the greater issue is if the other code outside of this uses the SIGALRM handler then that code would get broken by this (because signals are a global resource i prefer to only modify signals in the top level python source code (i.e. preferably right after a process is started)). – Trevor Boyd Smith Nov 06 '17 at 15:55
  • Upvote because it disables the alarm in finally. Without it this is a recipe for disaster. – Petar Donchev Jul 25 '18 at 08:38
42

I ran across this thread when searching for a timeout call on unit tests. I didn't find anything simple in the answers or 3rd party packages so I wrote the decorator below you can drop right into code:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Then it's as simple as this to timeout a test or any function you like:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...
Matt Tardiff
  • 9,869
  • 1
  • 16
  • 9
Rich
  • 12,068
  • 9
  • 62
  • 94
  • 23
    Be careful since this does not terminate the function after timeout is reached! – Sylvain Sep 16 '16 at 10:35
  • 1
    Note that on Windows, this spawns an entirely new process - which will eat into the time to timeout, perhaps by a lot if the dependencies take a long time to set up. – Russia Must Remove Putin Jan 11 '17 at 17:59
  • 2
    Yes, this needs some tweaking. It leaves threads going forever. – sudo Jan 28 '17 at 21:15
  • 4
    IDK if this is the best way, but you can try/catch `Exception` inside of func_wrapper and do `pool.close()` after the catch to ensure the thread always dies afterwards no matter what. Then you can throw `TimeoutError` or whatever you want after. Seems to work for me. – sudo Jan 28 '17 at 21:23
  • 2
    This is usefull, but once I have done it lots of times, I get `RuntimeError: can't start new thread`. Will it still work if I ignore it or is there something else I can do to get around this? Thanks in advance! – 56- Jul 26 '17 at 12:39
  • As Benjie say. After "thread.error: can't start new thread", I can no longer use it. – 1.618 Feb 25 '18 at 05:44
  • How I can use the same with FastAPI as decorator? Bcoz I tried it with FastAPI, it is getting executed while starting the application itself instead of getting it called at the time of api execution. – Aadhi Verma Apr 27 '23 at 12:53
36

The stopit package, found on pypi, seems to handle timeouts well.

I like the @stopit.threading_timeoutable decorator, which adds a timeout parameter to the decorated function, which does what you expect, it stops the function.

Check it out on pypi: https://pypi.python.org/pypi/stopit

egeland
  • 1,244
  • 1
  • 12
  • 19
  • Library claims, some functionality does not work in Windows. – Stefan Simik Jun 03 '19 at 16:48
  • 2
    For people who might get confused as me: The `stopit.utils.TimeoutException` doesn't stop your code! The code continues normal after this! I have spend 30 min in a program that was functioning normally.. Really good answer! – Charalamm Sep 01 '20 at 08:08
  • With stopit-1.1.2 the basic timeout decorator: ``@stopit.threading_timeoutable(default='not finished')`` works well on Linux and Windows as well. Simple and excellent solution if you only want a simple timeout. – Bence Kaulics Aug 19 '21 at 07:49
26

I am the author of wrapt_timeout_decorator.

Most of the solutions presented here work wunderfully under Linux on the first glance - because we have fork() and signals() - but on windows the things look a bit different. And when it comes to subthreads on Linux, You cant use Signals anymore.

In order to spawn a process under Windows, it needs to be picklable - and many decorated functions or Class methods are not.

So you need to use a better pickler like dill and multiprocess (not pickle and multiprocessing) - thats why You cant use ProcessPoolExecutor (or only with limited functionality).

For the timeout itself - You need to define what timeout means - because on Windows it will take considerable (and not determinable) time to spawn the process. This can be tricky on short timeouts. Lets assume, spawning the process takes about 0.5 seconds (easily !!!). If You give a timeout of 0.2 seconds what should happen? Should the function time out after 0.5 + 0.2 seconds (so let the method run for 0.2 seconds)? Or should the called process time out after 0.2 seconds (in that case, the decorated function will ALWAYS timeout, because in that time it is not even spawned)?

Also nested decorators can be nasty and You cant use Signals in a subthread. If You want to create a truly universal, cross-platform decorator, all this needs to be taken into consideration (and tested).

Other issues are passing exceptions back to the caller, as well as logging issues (if used in the decorated function - logging to files in another process is NOT supported)

I tried to cover all edge cases, You might look into the package wrapt_timeout_decorator, or at least test Your own solutions inspired by the unittests used there.

@Alexis Eggermont - unfortunately I dont have enough points to comment - maybe someone else can notify You - I think I solved Your import issue.

Xiddoc
  • 3,369
  • 3
  • 11
  • 37
bitranox
  • 1,664
  • 13
  • 21
  • 1
    This is a life saver for me! My problem was sometimes multiprocessing worker stalled for no reason and was consuming lot of memory and cpu in the sleep state. Tried various wrappers for multiprocessing which has an option for pool timeout but each gave me other different problems like processes not killed after the pool is terminated. Now with this decorator, simply after a long timeout, the function will be killed and the processes spawned inside it. It gives me BrokenPipeError for abruptly closing the pool, but it solved my main problem. Thank you! Any suggestions to handle BrokenPipeError ? – Arjun Sankarlal Jul 23 '20 at 07:50
  • 2
    @Arjun Sankarlal : of course if the worker is killed, the pipe will be broken. You need to catch the broken pipe error on the scheduler task and clean up properly. – bitranox Jul 24 '20 at 09:05
  • 1
    Yes I understand, and I did in try/except with BrokenPipeError but it was not caught. So I am using it in a webserver. I have a catch for BrokenPipeError and general Exception. So when the timeout occurred, I was returned with general exception not with broken pipe error. But after few seconds, the server printed BrokenPipeError in the console and it server the other requests without any problem. May be I introduce a delay after to check if the pool is broken and then return !? – Arjun Sankarlal Jul 26 '20 at 04:09
  • Thanks for your library. In my case it is best solution. – jonsbox Mar 25 '23 at 06:14
23

There are a lot of suggestions, but none using concurrent.futures, which I think is the most legible way to handle this.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Super simple to read and maintain.

We make a pool, submit a single process and then wait up to 5 seconds before raising a TimeoutError that you could catch and handle however you needed.

Native to python 3.2+ and backported to 2.7 (pip install futures).

Switching between threads and processes is as simple as replacing ProcessPoolExecutor with ThreadPoolExecutor.

If you want to terminate the Process on timeout I would suggest looking into Pebble.

Brian
  • 987
  • 10
  • 19
  • 3
    What does "Warning: this does not terminate function if timeout" mean? – Scott Stafford Dec 08 '17 at 16:25
  • 6
    @ScottStafford Processes/threads don't end just because a TimeoutError has been raised. So the process or the thread will still try to run to completion and will not automatically give you back control at your timeout. – Brian Dec 11 '17 at 07:59
  • Would this let me save any results that are intermediate at that time? e.g. if I have recursive function that I set timeout to 5, and in that time I have partial results, how do I write the function to return the partial results on timeout? – SumNeuron Mar 16 '18 at 11:49
  • I'm using exactly this, however I have a 1000 tasks, each is allowed 5 seconds before timeout. My problem is that cores get clogged on tasks that never end b'cause the timeout is only applied on the total of tasks not on individual tasks. concurrent.futures does not provide a solution to this afaik. – Bastiaan Apr 01 '19 at 04:42
21

Building on and and enhancing the answer by @piro , you can build a contextmanager. This allows for very readable code which will disable the alaram signal after a successful run (sets signal.alarm(0))

from contextlib import contextmanager
import signal
import time

@contextmanager
def timeout(duration):
    def timeout_handler(signum, frame):
        raise TimeoutError(f'block timedout after {duration} seconds')
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(duration)
    try:
        yield
    finally:
        signal.alarm(0)

def sleeper(duration):
    time.sleep(duration)
    print('finished')

Example usage:

In [19]: with timeout(2):
    ...:     sleeper(1)
    ...:     
finished

In [20]: with timeout(2):
    ...:     sleeper(3)
    ...:         
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
      1 with timeout(2):
----> 2     sleeper(3)
      3 

<ipython-input-7-a75b966bf7ac> in sleeper(t)
      1 def sleeper(t):
----> 2     time.sleep(t)
      3     print('finished')
      4 

<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
      2 def timeout(duration):
      3     def timeout_handler(signum, frame):
----> 4         raise Exception(f'block timedout after {duration} seconds')
      5     signal.signal(signal.SIGALRM, timeout_handler)
      6     signal.alarm(duration)

Exception: block timedout after 2 seconds
Rebs
  • 4,169
  • 2
  • 30
  • 34
boogie
  • 349
  • 1
  • 3
  • 11
  • 1
    This is a great way of doing it indeed. Just to add for completeness, the required import for this to work: `from contextlib import contextmanager` – mdev Mar 07 '21 at 11:35
  • 3
    An issue with the current implementation of this contextmanager is that an exception within the code block inside the context can result in the signal alarm not being disabled. To fix it a `try` + `finally` should be added. Similar to my timeout function decorator below (https://stackoverflow.com/a/66515961/1522304) – mdev Apr 19 '21 at 14:33
  • This method doesn't seem reliable. When I have very compute intensive code running, it seems the timeout never occurs. When I pause that process in a debugger with a breakpoint, it does eventually time out. – ShnitzelKiller Oct 15 '22 at 00:19
  • @ShnitzelKiller from `signal` [docs](https://docs.python.org/3/library/signal.html#execution-of-python-signal-handlers): A long-running calculation implemented purely in C (such as regular expression matching on a large body of text) may run uninterrupted for an arbitrary amount of time, regardless of any signals received. The Python signal handlers will be called when the calculation finishes. – alex Dec 06 '22 at 12:03
18

Great, easy to use and reliable PyPi project timeout-decorator (https://pypi.org/project/timeout-decorator/)

installation:

pip install timeout-decorator

Usage:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
Gil
  • 457
  • 1
  • 5
  • 16
  • 6
    I appreciate the clear solution. But could anyone explain how this library works, especially when dealing with multithreading. Personally I fear to use an unknown machanism to handle threads or signals. – wsysuper Nov 25 '18 at 03:07
  • @wsysuper the lib has 2 modes of operations: open new thread or a new subprocess (which suppose to be thread safe) – Gil Dec 02 '18 at 13:35
  • It seems that it does not work under linux as other solutions based on signal.SIGALRM – Mathieu Roger Feb 23 '21 at 16:59
  • 1
    This solution is not working on Python 3.7.6. I though you should know! That is too bad for me. – Andre Carneiro May 11 '21 at 13:55
  • @Gil How I can utilize it with FastAPI? – Aadhi Verma Apr 27 '23 at 13:10
10

timeout-decorator don't work on windows system as , windows didn't support signal well.

If you use timeout-decorator in windows system you will get the following

AttributeError: module 'signal' has no attribute 'SIGALRM'

Some suggested to use use_signals=False but didn't worked for me.

Author @bitranox created the following package:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Code Sample:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Gives the following exception:

TimeoutError: Function mytest timed out after 5 seconds
as - if
  • 2,729
  • 1
  • 20
  • 26
  • 1
    This sound like a very nice solution. Strangely, the line `from wrapt_timeout_decorator import *` seems to kill some of my other imports. For example I get `ModuleNotFoundError: No module named 'google.appengine'`, but I don't get this error if I don't import wrapt_timeout_decorator – Alexis Eggermont Apr 23 '19 at 14:48
  • 1
    @AlexisEggermont I was about to use this with appengine... so I am very curious ifthis error persisted? – PascalVKooten Sep 03 '19 at 15:14
  • when testing this, nothing seems to be printed out from either `message` or `seconds passed` – Avan Apr 03 '21 at 00:12
  • The 'Code Sample' worked perfectly on my Windows machine. My first attempt with Code Sample did not work because I wrongly named my file 'signal.py', and got this error "NameError: name 'timeout' is not defined". When you run Code Sample as a py file, name it 'my_signal.py' or anything other than 'signal.py'. – stok Oct 30 '21 at 01:14
7

Just in case it is helpful for anyone, building on the answer by @piro, I've made a function decorator:

import time
import signal
from functools import wraps


def timeout(timeout_secs: int):
    def wrapper(func):
        @wraps(func)
        def time_limited(*args, **kwargs):
            # Register an handler for the timeout
            def handler(signum, frame):
                raise Exception(f"Timeout for function '{func.__name__}'")

            # Register the signal function handler
            signal.signal(signal.SIGALRM, handler)

            # Define a timeout for your function
            signal.alarm(timeout_secs)

            result = None
            try:
                result = func(*args, **kwargs)
            except Exception as exc:
                raise exc
            finally:
                # disable the signal alarm
                signal.alarm(0)

            return result

        return time_limited

    return wrapper

Using the wrapper on a function with a 20 seconds timeout would look something like:

    @timeout(20)
    def my_slow_or_never_ending_function(name):
        while True:
            time.sleep(1)
            print(f"Yet another second passed {name}...")

    try:
        results = my_slow_or_never_ending_function("Yooo!")
    except Exception as e:
        print(f"ERROR: {e}")
mdev
  • 449
  • 7
  • 12
6

Highlights

  • Raises TimeoutError uses exceptions to alert on timeout - can easily be modified
  • Cross Platform: Windows & Mac OS X
  • Compatibility: Python 3.6+ (I also tested on python 2.7 and it works with small syntax adjustments)

For full explanation and extension to parallel maps, see here https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts

Minimal Example

>>> @killer_call(timeout=4)
... def bar(x):
...        import time
...        time.sleep(x)
...        return x
>>> bar(10)
Traceback (most recent call last):
  ...
__main__.TimeoutError: function 'bar' timed out after 4s

and as expected

>>> bar(2)
2

Full code

import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill

from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any

class TimeoutError(Exception):

    def __init__(self, func: Callable, timeout: int):
        self.t = timeout
        self.fname = func.__name__

    def __str__(self):
            return f"function '{self.fname}' timed out after {self.t}s"


def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
    """lemmiwinks crawls into the unknown"""
    q.put(dill.loads(func)(*args, **kwargs))


def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
    """
    Single function call with a timeout

    Args:
        func: the function
        timeout: The timeout in seconds
    """

    if not isinstance(timeout, int):
        raise ValueError(f'timeout needs to be an int. Got: {timeout}')

    if func is None:
        return functools.partial(killer_call, timeout=timeout)

    @functools.wraps(killer_call)
    def _inners(*args, **kwargs) -> Any:
        q_worker = mp.Queue()
        proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
        proc.start()
        try:
            return q_worker.get(timeout=timeout)
        except mpq.Empty:
            raise TimeoutError(func, timeout)
        finally:
            try:
                proc.terminate()
            except:
                pass
    return _inners

if __name__ == '__main__':
    @killer_call(timeout=4)
    def bar(x):
        import time
        time.sleep(x)
        return x

    print(bar(2))
    bar(10)

Notes

You will need to import inside the function because of the way dill works.

This will also mean these functions may not be not compatible with doctest if there are imports inside your target functions. You will get an issue with __import__ not found.

Alexander McFarlane
  • 10,643
  • 9
  • 59
  • 100
5

We can use signals for the same. I think the below example will be useful for you. It is very simple compared to threads.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"
A R
  • 2,697
  • 3
  • 21
  • 38
  • 2
    It would be better to choose a specific exception and to catch only it. Bare ``try: ... except: ...`` are always a bad idea. – hivert Jul 23 '13 at 11:28
  • I agree with you hivert. – A R Jul 26 '13 at 06:58
  • while I understand the reason, as a sysadmin/integrator I have disagree - python code is notorious for neglecting error handling, and handling the one thing you expect isn't good enough for quality software. you can handle the 5 things you plan for AND a generic strategy for other things. "Traceback, None" is not a strategy, it's an insult. – Florian Heigl Jun 19 '20 at 00:15
  • I don't understand you at all. If I'm planing do some timeout for specific function, how to do it in elegant style? What strategy must I plan when called function is depended on not elegant components? how to perfectly glue this one? please explain me with working elegant examples. – Znik Sep 21 '20 at 18:50
3

Another solution with asyncio :

If you want to cancel the background task and not just timeout on the running main code, then you need an explicit communication from main thread to ask the code of the task to cancel , like a threading.Event()

import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor


class SingletonTimeOut:
    pool = None

    @classmethod
    def run(cls, to_run: functools.partial, timeout: float):
        pool = cls.get_pool()
        loop = cls.get_loop()
        try:
            task = loop.run_in_executor(pool, to_run)
            return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
        except asyncio.TimeoutError as e:
            error_type = type(e).__name__ #TODO
            raise e

    @classmethod
    def get_pool(cls):
        if cls.pool is None:
            cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
        return cls.pool

    @classmethod
    def get_loop(cls):
        try:
            return asyncio.get_event_loop()
        except RuntimeError:
            asyncio.set_event_loop(asyncio.new_event_loop())
            # print("NEW LOOP" + str(threading.current_thread().ident))
            return asyncio.get_event_loop()

# ---------------

TIME_OUT = float('0.2')  # seconds

def toto(input_items,nb_predictions):
    return 1

to_run = functools.partial(toto,
                           input_items=1,
                           nb_predictions="a")

results = SingletonTimeOut.run(to_run, TIME_OUT)

raphaelauv
  • 670
  • 1
  • 11
  • 22
3

The func_timeout package by Tim Savannah has worked well for me.

Installation:

pip install func_timeout

Usage:

import time
from func_timeout import func_timeout, FunctionTimedOut

def my_func(n):
    time.sleep(n)

time_to_sleep = 10

# time out after 2 seconds using kwargs
func_timeout(2, my_func, kwargs={'n' : time_to_sleep})

# time out after 2 seconds using args
func_timeout(2, my_func, args=(time_to_sleep,))
emj
  • 45
  • 4
2
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
Hal Canary
  • 36
  • 1
  • 10
    While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value – Dan Cornilescu Apr 27 '16 at 13:48
  • I don't think this does answer the question as `subprocess.Popen(sys.argv[2:])` would be used to run [a command](https://stackabuse.com/pythons-os-and-subprocess-popen-commands/) not a Python function call. Unless the intent is to wrap the other Python script in this one, but that may not make for the easiest recovery from the stall. – Alex Moore-Niemi Dec 05 '20 at 22:01
1

I had a need for nestable timed interrupts (which SIGALARM can't do) that won't get blocked by time.sleep (which the thread-based approach can't do). I ended up copying and lightly modifying code from here: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

The code itself:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

and a usage example:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'
James
  • 357
  • 2
  • 10
1

I have face the same problem but my situation is need work on sub thread, signal didn't work for me, so I wrote a python package: timeout-timer to solve this problem, support for use as context or decorator, use signal or sub thread module to trigger a timeout interrupt:

from timeout_timer import timeout, TimeoutInterrupt

class TimeoutInterruptNested(TimeoutInterrupt):
    pass

def test_timeout_nested_loop_both_timeout(timer="thread"):
    cnt = 0
    try:
        with timeout(5, timer=timer):
            try:
                with timeout(2, timer=timer, exception=TimeoutInterruptNested):
                    sleep(2)
            except TimeoutInterruptNested:
                cnt += 1
            time.sleep(10)
    except TimeoutInterrupt:
        cnt += 1
    assert cnt == 2

see more: https://github.com/dozysun/timeout-timer

Dozy Sun
  • 71
  • 6
  • the thread timer mechanism work fine in sub thread, it will create a other sub thread as timer, after timeout seconds sub thread will call parent thread's stop which will raise a TimeoutInterrupt exception and captured in parent thread – Dozy Sun Sep 22 '21 at 03:05
1

Here is a simple example running one method with timeout and also retriev its value if successfull.

import multiprocessing
import time

ret = {"foo": False}


def worker(queue):
    """worker function"""

    ret = queue.get()

    time.sleep(1)

    ret["foo"] = True
    queue.put(ret)


if __name__ == "__main__":
    queue = multiprocessing.Queue()
    queue.put(ret)

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join(timeout=10)

    if p.exitcode is None:
        print("The worker timed out.")
    else:
        print(f"The worker completed and returned: {queue.get()}")
Martin Alexandersson
  • 1,269
  • 10
  • 12
0

Here is a slight improvement to the given thread-based solution.

The code below supports exceptions:

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Invoking it with a 5 second timeout:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)
diemacht
  • 2,022
  • 7
  • 30
  • 44
  • 1
    This will raise a new exception hiding the original traceback. See my version below... – Meitham Dec 14 '12 at 11:20
  • 1
    This is also unsafe, as if within `runFunctionCatchExceptions()` certain Python functions obtaining GIL are called. E.g. the following would never, or for very long time, return if called within the function: `eval(2**9999999999**9999999999)`. See http://stackoverflow.com/questions/22138190/python-threading-thread-start-doesnt-return-control-to-main-thread – Mikko Ohtamaa Oct 27 '14 at 12:53
0

Here is a POSIX version that combines many of the previous answers to deliver following features:

  1. Subprocesses blocking the execution.
  2. Usage of the timeout function on class member functions.
  3. Strict requirement on time-to-terminate.

Here is the code and some test cases:

import threading
import signal
import os
import time

class TerminateExecution(Exception):
    """
    Exception to indicate that execution has exceeded the preset running time.
    """


def quit_function(pid):
    # Killing all subprocesses
    os.setpgrp()
    os.killpg(0, signal.SIGTERM)

    # Killing the main thread
    os.kill(pid, signal.SIGTERM)


def handle_term(signum, frame):
    raise TerminateExecution()


def invoke_with_timeout(timeout, fn, *args, **kwargs):
    # Setting a sigterm handler and initiating a timer
    old_handler = signal.signal(signal.SIGTERM, handle_term)
    timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
    terminate = False

    # Executing the function
    timer.start()
    try:
        result = fn(*args, **kwargs)
    except TerminateExecution:
        terminate = True
    finally:
        # Restoring original handler and cancel timer
        signal.signal(signal.SIGTERM, old_handler)
        timer.cancel()

    if terminate:
        raise BaseException("xxx")

    return result

### Test cases
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        time.sleep(1)
    print('countdown finished')
    return 1337


def really_long_function():
    time.sleep(10)


def really_long_function2():
    os.system("sleep 787")


# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337

# Testing various scenarios
t1 = time.time()
try:
    print(invoke_with_timeout(1, countdown, 3))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function2))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)


t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)


class X:
    def __init__(self):
        self.value = 0

    def set(self, v):
        self.value = v


x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9
Troels
  • 167
  • 2
  • 10
0

I intend to kill the process if job not done , using thread and process both to achieve this.

from concurrent.futures import ThreadPoolExecutor

from time import sleep
import multiprocessing


# test case 1
def worker_1(a,b,c):
    for _ in range(2):
        print('very time consuming sleep')
        sleep(1)

    return a+b+c

# test case 2
def worker_2(in_name):
    for _ in range(10):
        print('very time consuming sleep')
        sleep(1)

    return 'hello '+in_name

Actual class as a contextmanager

class FuncTimer():
    def __init__(self,fn,args,runtime):
        self.fn = fn
        self.args = args
        self.queue = multiprocessing.Queue()
        self.runtime = runtime
        self.process = multiprocessing.Process(target=self.thread_caller)

    def thread_caller(self):
        with ThreadPoolExecutor() as executor:
            future = executor.submit(self.fn, *self.args)
            self.queue.put(future.result())

    def  __enter__(self):
        return self

    def start_run(self):
        self.process.start()
        self.process.join(timeout=self.runtime)
        if self.process.exitcode is None:
            self.process.kill()
        if self.process.exitcode is None:
            out_res = None
            print('killed premature')
        else:
            out_res = self.queue.get()
        return out_res


    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.process.kill()

How to use it

print('testing case 1') 
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp: 
    res = fp.start_run()
    print(res)

print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp: 
    res = fp.start_run()
    print(res)
PankajKushwaha
  • 878
  • 2
  • 11
  • 25
0

Here is a simple and easy to use decorator that returns a given default if the execution time for the function expires, inspired from the first answer to this question:

import signal
from functools import wraps
import time

def timeout(seconds, default=None):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            def signal_handler(signum, frame):
                raise TimeoutError("Timed out!")
            # Set up the signal handler for timeout
            signal.signal(signal.SIGALRM, signal_handler)

            # Set the initial alarm for the integer part of seconds
            signal.setitimer(signal.ITIMER_REAL, seconds)

            
            try:
                result = func(*args, **kwargs)
            except TimeoutError:
                return default
            finally:
                signal.alarm(0)
            
            return result
        
        return wrapper
    
    return decorator

@timeout(0.2, default="Timeout!")
def long_function_call(meal):
    time.sleep(3)
    return f"I have executed fully, {meal} is ready"

@timeout(1.3, default="Timeout!")
def less_long_function_call(meal):
    time.sleep(1)
    return f"I have executed fully, {meal} is ready"

result = long_function_call("bacon")
print(result)  # Prints "Timeout!" if the function execution exceeds 0.2 seconds
result = less_long_function_call("bacon")
print(result)  # Prints "Timeout!" if the function execution exceeds 1.3 seconds
Caridorc
  • 6,222
  • 2
  • 31
  • 46