0

I have N worker threads, they all read from a single queue.SimpleQueue where the main program writes tasks to perform. When a thread gets a message from the queue, its task can last for some minutes. So if the main program is interrupted (eg by CTRL-C or SIGINT) it could be a long time before all threads terminate. However I'd like the program to terminate within a few seconds. The task that a thread performs can be interrupted however, meaning that the thread can periodically check a flag or event variable to see whether it must terminate.

So I've come up with the following mix of sentinel messages and event variables to do the trick:

import queue
import threading

# worker thread
def worker_loop(work_queue, thread_id, event):

  while True:
    end = False
    item = work_queue.get()
    # work_queue.task_done()   # not needed for SImpleQueue

    if item is None:
      print("Thread %s: terminating due to sentinel" % thread_id)
      end = True
    else:
      # perform task, periodically checking for event
      for part in range(10):
        print("Thread %s: doing job part %s (item %s)" % (thread_id, part, item))
        do_work(part = part)                # this lasts a few seconds
        result = event.wait(timeout = 0.01)
        if result == True:
          print("Thread %s: terminating due to event" % thread_id)
          end = True
          break

    if end:
      return

# main
WORKERS = 5
THREADS = []

work_queue = queue.SimpleQueue()
event = threading.Event()

for thread_id in range(WORKERS):
  print("[Main] creating thread '%s'" % thread_id)
  t = threading.Thread(target = worker_loop, args = (work_queue, thread_id, event))
  THREADS.append(t)
  t.start()

# main code here...
# ...

# we received a signal, the time has come to terminate all threads...

# kill threads waiting on the queue
for i in range(NUM_THREADS):
  work_queue.put(None)

# signal other threads to terminate ASAP
event.set()

# wait for all threads to terminate
for thread_id in range(NUM_THREADS):
  THREADS[thread_id].join()

sys.exit()

My question is: is this method safe? It looks to me as if it is, but I figured I'd better check. Are there better/more elegant alternatives?

marc
  • 3
  • 3
  • Read [python-close-a-thread-on-multithreading](https://stackoverflow.com/questions/43683257/python-close-a-thread-on-multithreading/43686996?r=SearchResults&s=1|17.1090#43686996) – stovfl Sep 22 '19 at 15:32
  • That's about the normal usage of event variables. – marc Sep 22 '19 at 18:25
  • I need that, but I can also have threads waiting (blocked) on a queue as well, so my question is specifically about the combination of event variables + queue signalling. – marc Sep 22 '19 at 18:37
  • ***"threads waiting (blocked) on a queue"***: You have to use the **non blocking** variant to `get(...` from a `queue`. – stovfl Sep 22 '19 at 18:39
  • That could be a possibility, yes. I'd prefer to block so that when a task is sent a thread starts immediately instead of waiting for the next queue poll cycle. – marc Sep 22 '19 at 18:44
  • ***"I'd prefer to block"***, yes **non blocking** needs more line of code, but there is **no way** to interrupt a **blocking queue.get(..**. – stovfl Sep 22 '19 at 18:46
  • Of course there's a way, by sending a message as I do above. – marc Sep 22 '19 at 18:47
  • Yes, but that's not **interrupting**, of corse you can use that approach as well. But this it getting worse if multiple `Thread` reading the same `queue`. – stovfl Sep 22 '19 at 18:48
  • Sending a special value down the queue is a commonly used approach to tell threads to shutdown, see eg https://stackoverflow.com/a/7611097 or https://stackoverflow.com/a/35463638 – marc Sep 22 '19 at 18:59
  • Anyway, I found some more discussion of this issue here https://stackoverflow.com/questions/25116935/python-waiting-for-a-queue-and-an-event and https://stackoverflow.com/questions/12317940/python-threading-can-i-sleep-on-two-threading-events-simultaneously – marc Sep 22 '19 at 19:00

0 Answers0