My use case: I want to call fcntl.flock()
on a file but have a timeout. Following the recipe in Timeout on a function call, I wrapped my code in a contextmanager
that implements timeouts via a Unix signal:
@contextmanager
def doTimeout(seconds):
"""Creates a "with" context that times out after the
specified time."""
def timeout_handler(signum, frame):
pass
original_handler = signal.signal(signal.SIGALRM, timeout_handler)
try:
signal.alarm(seconds)
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, original_handler)
and used it as follows:
with doTimeout(timeout):
try:
fcntl.flock(self.file, fcntl.LOCK_EX)
self.locked = True
return True
except (OSError, IOError) as e:
if e.errno == errno.EINTR:
return False
raise
This all worked perfectly, but unfortunately I can only do this from the main thread because only the main thread can catch signals. Is there a way to do it from another thread?
My alternatives at this point are to periodically test the lock and then sleep, or launch a subprocess. Neither of these is ideal; is there a better way?