3

I'm working in Python3.7 with multiple threads communicating with queues. If my program is interrupted or encounters an unexpected error, I would like it to fully cleanup and gracefully exit. However, I have noticed that on certain occasions, when interrupting a nonblocking Queue.get() function (such as with SIGINT) the Queue never releases its lock, so my attempt to cleanup any respective thread associated that queue will stall forever. This requires me to manually send more SIGINTs, which is undesirable for unsupervised execution.

This behavior can be seen in the following code. Pressing ctrl-C usually does not immediately exit the program. Instead, it hangs. The final traceback shows that it is stuck waiting to acquire a lock.

import threading
import queue

def test_function(test_queue, thread_terminator):
    while True:
        try:
            test_queue.put_nowait("test")
        except Full:
            pass

        if thread_terminator.is_set():
            return

if __name__ == '__main__':
    test_queue = queue.Queue()
    thread_terminator = threading.Event()
    test_thread = threading.Thread(target=test_function, args=(test_queue, thread_terminator))
    test_thread.start()

    while True:
        try:
            test_queue.get_nowait()
        except queue.Empty:
            pass
        except:
            thread_terminator.set()
            test_thread.join()
            raise
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/threading.py", line 1281, in _shutdown
    t.join()
  File "/usr/local/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/usr/local/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):

Why does this happen? Python queues are fully thread safe, right? And the queue library here https://github.com/python/cpython/blob/master/Lib/queue.py seems to acquire its lock through context managers, so it seems to me the the lock should be released in the event of an exception.

I wrote some code to manually release the lock and seemed to fix the issue. Now, when exiting with SIGINT, I no longer encounter the aforementioned issue.

import threading
import queue

def test_function(test_queue, thread_terminator):
    while True:
        try:
            test_queue.put_nowait("test")
        except queue.Full:
            pass

        if thread_terminator.is_set():
            return

if __name__ == '__main__':
    test_queue = queue.Queue()
    thread_terminator = threading.Event()
    test_thread = threading.Thread(target=test_function, args=(test_queue, thread_terminator))
    test_thread.start()

    while True:
        try:
            test_queue.get_nowait()
        except queue.Empty:
            pass
        except:
            thread_terminator.set()
            try:
               print(f"releasing lock")
               test_queue.mutex.release()
               print(f"lock released")
            except RuntimeError:
                print("lock already released")
            test_thread.join()
            raise

But this sketchy workaround doesn't seem ideal to me. What if another thread has legitimately acquired a lock on the queue, and this code screws that up? I haven't personally encountered that scenario yet, but am concerned.

How do I safely prevent these unreleased locks and prevent threads from hanging?

Kevin Guo
  • 31
  • 1
  • 2
  • Might be related to this: https://stackoverflow.com/a/11436603/11877195 –  Aug 03 '19 at 15:25
  • 2
    Thanks! The issue with that post was how to signal a thread to stop - but I already account for that by using an event to signal to the thread to exit the control loop. The problem here was that even though the thread had a signal to stop, it was frozen by an unreleased queue lock. – Kevin Guo Aug 05 '19 at 06:57

0 Answers0