1

The official documentation here gives the following example:

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
    t = Thread(target=worker)
    t.daemon = True
    t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

I want to make sure all the threads are killed at this point before my main thread proceeds. I suppose after all the tasks in the queue have been processed, the q.get() method will raise an exception, which should kill the thread. Is that correct?

Dr. Alpha
  • 197
  • 3
  • 9

2 Answers2

2

No. If there are no items in the queue, get will, by default, wait for items to be put into the queue. If you want it to raise an exception when there are no more items, either pass it block=False or use get_nowait.

Once you're using the non-blocking get, it should all work, but it is rather inelegant for your threads to die because of a raised exception in the normal case. I'd recommend surrounding it with a try block, and if it throws an exception due to the queue being empty, stop the thread cleanly:

try:
    item = q.get(block=False)
except queue.Empty:
    return
icktoofay
  • 126,289
  • 21
  • 250
  • 231
0

if any of do_work()'s calls raise an exception then the thread that runs it will exit. Your main thread will block forever on q.join() because q.get() hasn't been followed by q.task_done() in this case.

You could rewrite the example using a thread pool:

from multiprocessing.dummy import Pool # use threads

p = Pool(num_worker_threads)    
for _ in p.imap_unordered(do_work, source()):
    pass
p.close()
p.join() # no threads after this point

In this case, if do_work() raises an exception; it is propagated to the main thread and it exits (pool threads are daemonic so they do not keep the program running).

Another alternative to the Queue-based solution is to put sentinel values into the queue (one value per thread) and exit worker() if a sentinel is encountered, example:

STOP = object()

def worker(queue):
    for item in iter(queue.get, STOP): # until STOP is encountered
        do_work(item)

# instead of `q.join()`
for _ in threads: q.put(STOP)
for t in threads: t.join() # no threads after this point
Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670