I'm using the python unittest framework to perform unit tests. The python version i'm using is 3.6. I'm using the windows OS.
My production code is currently a little bit unstable (as i added some new functionality) and tends to hangup in some internal asynchronous loops. I'm working on fixing those hangups, currently. However, the tracking of those bugs is hindered by the corresponding testcases hanging up, too. I would merely like that the corresponding testcases stop after e.g. 500ms if they not run through and be marked as FAILED in order to let all the other testcases beeing continued.
Unfortunately, the unittest framework does not support timeouts (If i had known in advance ...). Therefore i'm searching for a workaround of this. If some package would add that missing functionality to the unittest framework i would be willing to try. What i don't want is that my production code relies on too much non standard packages, for the unittests this would be OK.
I'm a little lost on howto adding such functionality to the unittests.
Therefore i just tried out some code from here: How to limit execution time of a function call?. As they said somewhere that threading
should not be used to implement timeouts i tried using multiprocessing
, too.
Please note that the solutions proposed here How to specify test timeout for python unittest? do not work, too. They are designed for linux (using SIGALRM).
import multiprocessing
# import threading
import time
import unittest
class InterruptableProcess(multiprocessing.Process):
# class InterruptableThread(threading.Thread):
def __init__(self, func, *args, **kwargs):
super().__init__()
self._func = func
self._args = args
self._kwargs = kwargs
self._result = None
def run(self):
self._result = self._func(*self._args, **self._kwargs)
@property
def result(self):
return self._result
class timeout:
def __init__(self, sec):
self._sec = sec
def __call__(self, f):
def wrapped_f(*args, **kwargs):
it = InterruptableProcess(f, *args, **kwargs)
# it = InterruptableThread(f, *args, **kwargs)
it.start()
it.join(self._sec)
if not it.is_alive():
return it.result
#it.terminate()
raise TimeoutError('execution expired')
return wrapped_f
class MyTests(unittest.TestCase):
def setUp(self):
# some initialization
pass
def tearDown(self):
# some cleanup
pass
@timeout(0.5)
def test_XYZ(self):
# looong running
self.assertEqual(...)
The code behaves very differently using threads vs. processes. In the first case it runs through but continues execution of the function despite timeout (which is unwanted). In the second case it complains about unpickable objects.
In both cases i would like to know howto do proper cleanup, e.g. call the unittest.TestCase.tearDown
method on timeout from within the decorator class.