Given a pretty standard read/write multithreaded process with a read Queue and a write Queue:
8 times worker done
is printed, but the join() statement is never passed. But if I replace queue_out.put(r)
by `queue_out.put(1) it works.
This is melting my brain, probably something really stupid. Should I make a copy of my dictionary and put that in the return Queue? Did I make a stupid mistake somewhere?
Process function
def reader(queue_in, queue_out, funktion):
# Read from the queue
while True:
r = queue_in.get()
if r == 'DONE':
return
funktion(r) # funktion adds additional keys to the dictionary
queue_out.put(r) # <---- replacing r by 1 does let me join()
print "worker done" # <----- this happens
Populate the input queue
def writer(generator, queue):
# Write to the queue
for r in enumerate(generator):
# r is a complex dictionary
queue.put(r)
print "writer done"
for _ in range(0, WORKERS):
queue.put((-1, "DONE"))
The rest
WORKERS = 8
# init Queues
queue_in = Queue()
queue_out = Queue()
# Start processes, with input and output quests
readers = []
for _ in range(0, WORKERS):
p = Process(target=reader, args=(queue_in, queue_out, funktion))
p.daemon = True
p.start()
readers.append(p)
writer(generator, queue_in)
for p in readers:
p.join()
print "joined" # <---- this never happens
queue_in.close()
while not queue_out.empty():
print queue_out.get()
queue_out.close()