0

I would like to put output data into a queue in a multiprocessing computation. It seems that when the size of the return is too large, the program got stuck. To illustrate the problem, here is a minimal codes. Anyone can help to make this work?

from multiprocessing import Process, Queue
import numpy as np

def foo(q, qid):
    x = np.random.randint(0,5,7)
    y = np.random.random(100*10*10).reshape(100,10,10)
    q.put([qid,x,y])

def main():
    processes = []
    q = Queue()

    for qid in range(5):
        p = Process(target=foo, args=(q, qid))
        p.start()
        processes.append(p)

    for process in processes:
        process.join()

    for qid in range(5):
        [_, x, y] = q.get()
        print(x)
        print(y)

if __name__ == '__main__':
    main()
Tomerikoo
  • 18,379
  • 16
  • 47
  • 61
drbombe
  • 609
  • 7
  • 15
  • What do you mean "stuck"? No error message? Locked up? Or was it just taking a long time? Start with smaller data to ensure your program works as expected and slowly increase the data size. – RufusVS Mar 26 '21 at 20:19
  • Smaller data works well. For the example above, it just stuck without error messages. The debugger can run to q.put() – drbombe Mar 26 '21 at 20:24
  • There may be good answers [here](https://stackoverflow.com/questions/31552716/multiprocessing-queue-full#34035902) Perhaps you can go somewhat larger by giving each process its own queue, but as the above link will tell you, there's underlying implementation issues for queue. – RufusVS Mar 27 '21 at 00:02
  • Does this answer your question? [Deadlock with big object in multiprocessing.Queue](https://stackoverflow.com/questions/59196165/deadlock-with-big-object-in-multiprocessing-queue) – Darkonaut Mar 30 '21 at 21:43

1 Answers1

0

I figured out one solution is as below to switch the join and get. By default, the get method blocks.

from multiprocessing import Process, Queue
import numpy as np

def foo(q, qid):
    x = np.random.randint(0,5,7)
    y = np.random.random(100*10*10).reshape(100,10,10)
    q.put([qid,x,y])

def main():
    processes = []
    q = Queue()

    for qid in range(5):
        p = Process(target=foo, args=(q, qid))
        p.start()
        processes.append(p)

    for qid in range(5):
        [_, x, y] = q.get()
        print(x)
        print(y)

    for process in processes:
        process.join()

 if __name__ == '__main__':
    main()
drbombe
  • 609
  • 7
  • 15