19

In python2.7, multiprocessing.Queue throws a broken error when initialized from inside a function. I am providing a minimal example that reproduces the problem.

#!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing

def main():
    q = multiprocessing.Queue()
    for i in range(10):
        q.put(i)

if __name__ == "__main__":
    main()

throws the below broken pipe error

Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
IOError: [Errno 32] Broken pipe

Process finished with exit code 0

I am unable to decipher why. It would certainly be strange that we cannot populate Queue objects from inside a function.

hAcKnRoCk
  • 1,118
  • 3
  • 16
  • 30

3 Answers3

27

When You fire up Queue.put(), implicit thread is started to deliver data to a queue. Meanwhile, main application is finished and there is no ending station for the data (queue object is garbage-collected).

I would try this:

from multiprocessing import Queue

def main():
    q = Queue()
    for i in range(10):
        print i
        q.put(i)
    q.close()
    q.join_thread()

if __name__ == "__main__":
    main()

join_thread() ensures, all data in the buffer has been flushed. close() must be called before join_thread()

Peter Svac
  • 302
  • 4
  • 4
  • Note that this has indeed been [fixed](https://github.com/python/cpython/issues/80025) in Python 3.11 and was backported to [3.9.13](https://github.com/python/cpython/pull/92277) and [3.10.5](https://github.com/python/cpython/pull/92276) (based on PR merge date and py release schedules) – Max Aug 09 '23 at 08:52
15

EDIT : please use @Peter Svac answer, which is better. Usage of join_thread is ensuring the Queue does its job in a much much better fashion than the time.sleep(0.1) I proposed.

What happens here is that when you call main(), it creates the Queue, put 10 objects in it and ends the function, garbage collecting all of its inside variables and objects, including the Queue. BUT you get this error because you are still trying to send the last number in the Queue.

from the documentation documentation :

"When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe."

As the put() is made in another Thread, it is not blocking the execution of the script, and allows to ends the main() function before completing the Queue operations.

Try this :

#!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing
import time
def main():
    q = multiprocessing.Queue()
    for i in range(10):
        print i
        q.put(i)
    time.sleep(0.1) # Just enough to let the Queue finish

if __name__ == "__main__":
    main()

There should be a way to join the Queue or block execution until the object is put in the Queue, you should take a look in the documentation.

CoMartel
  • 3,521
  • 4
  • 25
  • 48
-2

With a delay using time.sleep(0.1) as suggested by @HarryPotFleur, this problem is solved. However, I tested the code with python3 and the broken pipe issue does not happen at all in python3. I think this was reported as a bug and later got fixed.

hAcKnRoCk
  • 1,118
  • 3
  • 16
  • 30
  • 13
    It's **not true**, that it don't happen in python3 at all.What is more `time.sleep(0.1)` isn't solve! It's was just for understanding! – S.R Apr 07 '17 at 15:07
  • 4
    Using a sleep is not a solution, it's a hacky workaround. And this does happen in python3. https://bugs.python.org/issue35844 – medley56 Nov 06 '19 at 19:46