I have N worker threads, they all read from a single queue.SimpleQueue
where the main program writes tasks to perform. When a thread gets a message from the queue, its task can last for some minutes. So if the main program is interrupted (eg by CTRL-C or SIGINT) it could be a long time before all threads terminate. However I'd like the program to terminate within a few seconds.
The task that a thread performs can be interrupted however, meaning that the thread can periodically check a flag or event variable to see whether it must terminate.
So I've come up with the following mix of sentinel messages and event variables to do the trick:
import queue
import threading
# worker thread
def worker_loop(work_queue, thread_id, event):
while True:
end = False
item = work_queue.get()
# work_queue.task_done() # not needed for SImpleQueue
if item is None:
print("Thread %s: terminating due to sentinel" % thread_id)
end = True
else:
# perform task, periodically checking for event
for part in range(10):
print("Thread %s: doing job part %s (item %s)" % (thread_id, part, item))
do_work(part = part) # this lasts a few seconds
result = event.wait(timeout = 0.01)
if result == True:
print("Thread %s: terminating due to event" % thread_id)
end = True
break
if end:
return
# main
WORKERS = 5
THREADS = []
work_queue = queue.SimpleQueue()
event = threading.Event()
for thread_id in range(WORKERS):
print("[Main] creating thread '%s'" % thread_id)
t = threading.Thread(target = worker_loop, args = (work_queue, thread_id, event))
THREADS.append(t)
t.start()
# main code here...
# ...
# we received a signal, the time has come to terminate all threads...
# kill threads waiting on the queue
for i in range(NUM_THREADS):
work_queue.put(None)
# signal other threads to terminate ASAP
event.set()
# wait for all threads to terminate
for thread_id in range(NUM_THREADS):
THREADS[thread_id].join()
sys.exit()
My question is: is this method safe? It looks to me as if it is, but I figured I'd better check. Are there better/more elegant alternatives?