I'm working in Python3.7 with multiple threads communicating with queues. If my program is interrupted or encounters an unexpected error, I would like it to fully cleanup and gracefully exit. However, I have noticed that on certain occasions, when interrupting a nonblocking Queue.get() function (such as with SIGINT) the Queue never releases its lock, so my attempt to cleanup any respective thread associated that queue will stall forever. This requires me to manually send more SIGINTs, which is undesirable for unsupervised execution.
This behavior can be seen in the following code. Pressing ctrl-C usually does not immediately exit the program. Instead, it hangs. The final traceback shows that it is stuck waiting to acquire a lock.
import threading
import queue
def test_function(test_queue, thread_terminator):
while True:
try:
test_queue.put_nowait("test")
except Full:
pass
if thread_terminator.is_set():
return
if __name__ == '__main__':
test_queue = queue.Queue()
thread_terminator = threading.Event()
test_thread = threading.Thread(target=test_function, args=(test_queue, thread_terminator))
test_thread.start()
while True:
try:
test_queue.get_nowait()
except queue.Empty:
pass
except:
thread_terminator.set()
test_thread.join()
raise
Traceback (most recent call last):
File "/usr/local/lib/python3.7/threading.py", line 1281, in _shutdown
t.join()
File "/usr/local/lib/python3.7/threading.py", line 1032, in join
self._wait_for_tstate_lock()
File "/usr/local/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
Why does this happen? Python queues are fully thread safe, right? And the queue library here https://github.com/python/cpython/blob/master/Lib/queue.py seems to acquire its lock through context managers, so it seems to me the the lock should be released in the event of an exception.
I wrote some code to manually release the lock and seemed to fix the issue. Now, when exiting with SIGINT, I no longer encounter the aforementioned issue.
import threading
import queue
def test_function(test_queue, thread_terminator):
while True:
try:
test_queue.put_nowait("test")
except queue.Full:
pass
if thread_terminator.is_set():
return
if __name__ == '__main__':
test_queue = queue.Queue()
thread_terminator = threading.Event()
test_thread = threading.Thread(target=test_function, args=(test_queue, thread_terminator))
test_thread.start()
while True:
try:
test_queue.get_nowait()
except queue.Empty:
pass
except:
thread_terminator.set()
try:
print(f"releasing lock")
test_queue.mutex.release()
print(f"lock released")
except RuntimeError:
print("lock already released")
test_thread.join()
raise
But this sketchy workaround doesn't seem ideal to me. What if another thread has legitimately acquired a lock on the queue, and this code screws that up? I haven't personally encountered that scenario yet, but am concerned.
How do I safely prevent these unreleased locks and prevent threads from hanging?