I process some data which I dispatch into a queue, which is consumed by workers, who in return put their results in another queue, which is finally parsed to get a final result. The example below puts self.size
times the number 1
in the queue and the workers put back the value 10
:
import multiprocessing
import Queue
class Dispatch():
def __init__(self):
self.q_in = multiprocessing.Queue()
self.q_out = multiprocessing.Queue()
self.maxproc = 4
self.size = 1000
def putdata(self):
# put data to the queue
for d in range(self.size):
self.q_in.put(1)
# put sentinels at the end of the FIFO queue
for i in range(self.maxproc):
self.q_in.put("STOP")
def processing(self):
procs = []
for i in range(self.maxproc):
p = multiprocessing.Process(target=self.multiply)
procs.append(p)
p.start()
return procs
def multiply(self):
while True:
item = self.q_in.get()
if item == 'STOP':
print("{0} exiting".format(multiprocessing.current_process()))
return
self.q_out.put(item * 10)
def getdata(self):
total = 0
while True:
try:
item = self.q_out.get(block=False)
total += item
except Queue.Empty:
print("done getdata")
break
print("data out: {0}".format(total))
if __name__=="__main__":
dis = Dispatch()
procs = dis.processing()
dis.putdata()
for p in procs:
p.join()
dis.getdata()
print("done __main__")
This code breaks when self.size
gets larger. For instance it works for self.size = 1000
:
<Process(Process-1, started)> exiting
<Process(Process-4, started)> exiting
<Process(Process-2, started)> exiting
<Process(Process-3, started)> exiting
done getdata
data out: 10000
done __main__
However, for self.size = 10000
it outputs
<Process(Process-2, started)> exiting
<Process(Process-4, started)> exiting
<Process(Process-1, started)> exiting
<Process(Process-3, started)> exiting
and hangs. It looks like it hangs around the join()
calls (as the processes return
ed but getdata()
is not reached)
Why is it so? Is there a limitation on the size of the queue? (I monitored the memory and CPU during the run but they are fine) Or is it something else?