4

Note1: I want to use mutiprocessing.Queue in multiple process, When I found this issue occurs in single-process situation. So, the following code is use single process to simplify question.

There is a similar question : Broken pipe error with multiprocessing.Queue.

The answer in that post proves that this issue is because the main thread exits before the queue thread finishes its job. The way he fixed it was by adding sleep(0.1) to his code:

import multiprocessing
import time
def main():
    q = multiprocessing.Queue()
    for i in range(10):
        print i
        q.put(i)
    time.sleep(0.1) # Just enough to let the Queue finish

if __name__ == "__main__":
    main()

But, I think sleep is not a stable method for production code, so I tried to use join to do that. You can see my code below, but unfortunately, it does not work. Is there someone who knows how to do this without sleep?

import multiprocessing
import time


def main():
    q = multiprocessing.Queue()
    for i in range(10):
        q.put(i)
    # time.sleep(4)
    q.close()
    q.join_thread()

if __name__ == "__main__":
    main() 
scott huang
  • 2,478
  • 4
  • 21
  • 36
  • What do you want to do? Using multiprocessing.Queue in just one process? – Sraw Oct 24 '17 at 03:45
  • please see my edited question. in short, I want to use it with multi process, but when I raise this question, I choose to simplify question with minimal code. – scott huang Oct 24 '17 at 04:39
  • You almost won't meet this issue in multi processes. Because you will have a consumer process which can block main process. – Sraw Oct 24 '17 at 04:46
  • 1
    Yes, I try it with multi-process, it works fine. but I still curious why this happened in single process. The source of this question(code from my project) have issue, even with multiprocesing, I will try to give another simplified code. – scott huang Oct 24 '17 at 05:29

2 Answers2

3

Let us first describe some details of multiprocessing.Queue.

When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.

The pipe is created via reader, writer = socket.socketpair().

queue.close() is designed for multi-processes and it does two things

  1. close reader (important!)
  2. send a sentinel value to queue.buffer, the background thread will exit if encounter such value

In single process case, the reason why queue.close() does not work is because step 1, if there still some data in buffer, the background thread will continue to write data to a already closed socket, which lead to Broken pipe error.

A simple example to demo the error

import socket

reader, writer = socket.socketpair()
writer.send("1")

# queue.close() will internally call reader.close()
reader.close()

# got a Broken pipe error
writer.send("2")

In multi-processes case, close reader in main process only decrement the reference count of the underlying socket(the main and child process share the socket), not really close (or shutdown) the socket.

Jacky1205
  • 3,273
  • 3
  • 22
  • 44
1

Program:

import multiprocessing

def main():
    q = multiprocessing.Queue()
    q.put(0)

if __name__ == '__main__':
    main()

Output:

Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 251, in _feed
    send_bytes(obj)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 205, in send_bytes 
    self._send_bytes(m[offset:offset + size])
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 416, in _send_bytes 
    self._send(header + buf)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/connection.py", line 373, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

A BrokenPipeError exception is raised when the queue thread of a multiprocessing.Queue still sends enqueued items to the write end of the queue pipe after the read end of the queue pipe has been automatically closed during its garbage collection following the garbage collection of the queue (the write end of the queue pipe is not garbage collected because it is also referenced by the queue thread).

I think this is a bug so I have opened a pull request on GitHub.

A workaround is make sure that no enqueued items are left for the queue thread to send when the queue is garbage collected, by dequeuing all enqueued items before:

import multiprocessing

def main():
    q = multiprocessing.Queue()
    q.put(0)
    q.get()

if __name__ == '__main__':
    main()
Géry Ogam
  • 6,336
  • 4
  • 38
  • 67