If you need a context manager that acquires a lock in a non-blocking manner, but still retries until the lock can finally be acquired, you could do like this:
@contextlib.contextmanager
def non_blocking_lock(lock : threading.Lock):
# Waits as long as the lock can not be acquired, but releases the GIL in the meanwhile
while not lock.acquire(blocking=False):
pass
try:
yield # Lock has been successfully acquired
finally:
lock.release()
It can be used exactly like the normal lock context manager:
class TestClass:
def __init__(self):
self._lock = threading.Lock()
def method(self):
with non_blocking_lock(self._lock):
# do something that should be only done from one thread at once
... with the difference, that the lock is non-blocking and doesn't hold the GIL until the lock is released. I used it to fix some deadlocks.
The difference to the other solutions is, that the code eventually gets executed and the context manager does not simply return a false or throw an exception when the lock couldn't be acquired.
Correct me if you see any caveats with this solution.