3

I process some data which I dispatch into a queue, which is consumed by workers, who in return put their results in another queue, which is finally parsed to get a final result. The example below puts self.size times the number 1 in the queue and the workers put back the value 10:

import multiprocessing
import Queue

class Dispatch():
    def __init__(self):
        self.q_in = multiprocessing.Queue()
        self.q_out = multiprocessing.Queue()
        self.maxproc = 4
        self.size = 1000

    def putdata(self):
        # put data to the queue
        for d in range(self.size):
            self.q_in.put(1)
        # put sentinels at the end of the FIFO queue
        for i in range(self.maxproc):
            self.q_in.put("STOP")

    def processing(self):
        procs = []
        for i in range(self.maxproc):
            p = multiprocessing.Process(target=self.multiply)
            procs.append(p)
            p.start()
        return procs

    def multiply(self):
        while True:
            item = self.q_in.get()
            if item == 'STOP':
                print("{0} exiting".format(multiprocessing.current_process()))
                return
            self.q_out.put(item * 10)

    def getdata(self):
        total = 0
        while True:
            try:
                item = self.q_out.get(block=False)
                total += item
            except Queue.Empty:
                print("done getdata")
                break
        print("data out: {0}".format(total))

if __name__=="__main__":
    dis = Dispatch()
    procs = dis.processing()
    dis.putdata()
    for p in procs:
        p.join()
    dis.getdata()
    print("done __main__")

This code breaks when self.size gets larger. For instance it works for self.size = 1000:

<Process(Process-1, started)> exiting
<Process(Process-4, started)> exiting
<Process(Process-2, started)> exiting
<Process(Process-3, started)> exiting
done getdata
data out: 10000
done __main__

However, for self.size = 10000 it outputs

<Process(Process-2, started)> exiting
<Process(Process-4, started)> exiting
<Process(Process-1, started)> exiting
<Process(Process-3, started)> exiting

and hangs. It looks like it hangs around the join() calls (as the processes returned but getdata()is not reached)

Why is it so? Is there a limitation on the size of the queue? (I monitored the memory and CPU during the run but they are fine) Or is it something else?

WoJ
  • 27,165
  • 48
  • 180
  • 345
  • Perhaps `getdata` _does_ get reached, but it takes a very long time to finish the `while True` loop, so it appears to hang. Try putting a `print("getting item...")` inside the `try`, just to see if it's running. – Kevin Dec 01 '14 at 19:53
  • This happens because the `multiprocessing.Queue` in the child process isn't able to flush all the queued data to its internal `Pipe`, so it gets buffered in memory. The child process can't exit until all the buffered data is flushed to the `Pipe`, but that won't happen until you start reading from the other end of the `Queue`. You don't try to read from the other end of the `Pipe` until *after* the child exits, so you deadlock. See the dupe for more details. – dano Dec 01 '14 at 20:00
  • call `getdata` before `join`, but make sure those are blocking get's on the queue. – tdelaney Dec 01 '14 at 20:08
  • @dano: thanks for pointing out the duplicate. This is exactly my case and the solution works perfectly. – WoJ Dec 02 '14 at 10:39

0 Answers0