I am trying to implement a multithreaded producer-consumer pattern using Queue.Queue in Python 2.7. I am trying to figure out how to make the consumers, i.e. the worker threads, stop once all required work is done.
See the second comment by Martin James to this answer: https://stackoverflow.com/a/19369877/1175080
Send an 'I am finished' task, instructing the pool threads to terminate. Any thread that gets such a task requeues it and then commits suicide.
But this does not work for me. See the following code for example.
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
# Requeue the exit indicator.
q.put(-1)
# Commit suicide.
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send an exit indicator for all threads to consume.
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
This program hangs after all three workers have read the exit indicator,
i.e. -1
from the queue, because each worker requeues -1
before
exiting, so the queue never becomes empty and q.join()
never returns.
I came up with the following but ugly solution where I send a -1
exit
indicator for each worker via the queue, so that each worker can see it
and commit suicide. But the fact that I have to send an exit indicator
for each worker feels a little ugly.
import Queue
import threading
import time
def worker(n, q):
# n - Worker ID
# q - Queue from which to receive data
while True:
data = q.get()
print 'worker', n, 'got', data
time.sleep(1) # Simulate noticeable data processing time
q.task_done()
if data == -1: # -1 is used to indicate that the worker should stop
print 'worker', n, 'is exiting'
break
def master():
# master() sends data to worker() via q.
q = Queue.Queue()
# Create 3 workers.
for i in range(3):
t = threading.Thread(target=worker, args=(i, q))
t.start()
# Send 10 items to work on.
for i in range(10):
q.put(i)
time.sleep(0.5)
# Send one stop indicator for each worker.
for i in range(3):
q.put(-1)
print 'waiting for workers to finish ...'
q.join()
print 'done'
master()
I have two questions.
- Can the method of sending a single exit indicator for all threads (as explained in the second comment of https://stackoverflow.com/a/19369877/1175080 by Martin James) even work?
- If the answer to the previous question is "No", is there a way to solve the problem in a way that I don't have to send a separate exit indicator for each worker thread?