In Python 2.7 i have implemented a multiprocessing scenario with multiple queues and consumers. The simplified idea is, that I have a producer of jobs, which are fed to a consumer, handling the jobs and an error handler, that does all the logging. Very simplified, it all looks comparable to that:
import multiprocessing as mp
import Queue
job_queue = mp.Queue()
error_queue = mp.Queue()
for i in range(10):
job_queue.put(i)
def job_handler(job_queue, error_queue):
print 'Job handler'
while True:
try:
element = job_queue.get_nowait()
print element
except:
# t1
error_queue.put('Error')
error_queue.close()
error_queue.join_thread()
job_queue.close()
job_queue.join_thread()
# t2
return 1
def error_handler(error_queue):
result = error_queue.get()
if result == 'Error':
error_queue.close()
error_queue.join_thread()
if __name__ == '__main__':
print 'Starting'
p1 = mp.Process(target = error_handler, args = (error_queue, ))
p1.start()
p2 = mp.Process(target = job_handler, args = (job_queue, error_queue))
p2.start()
This basically works, but in my more complex programm, there is a very long time difference between the two commentary points t1
and t2
(about 5 min). So I have two questions:
- Do I understand correct, that every process should call
close()
andjoin_thread()
on all used Queue objects, to indicate it's done using them? I think, that subprocesses do that implicitly when I end them, for example by returning as stated here:
join_thread() Join the background thread. This can only be used after close() has been called. It blocks until the background thread exits, ensuring that all data in the buffer has been flushed to the pipe.
By default if a process is not the creator of the queue then on exit it will attempt to join the queue’s background thread. The process can call cancel_join_thread() to make join_thread() do nothing.
- How can I figure out why the join process takes such a long time?