pipe(7)
Linux manual page specifies that a pipe has a limited capacity (65,536 bytes by default) and that writing to a full pipe blocks until enough data has been read from the pipe to allow the write to complete:
I/O on pipes and FIFOs
[…]
If a process attempts to read from an empty pipe, then read(2)
will block until data is available. If a process attempts to write to a full pipe (see below), then write(2)
blocks until sufficient data has been read from the pipe to allow the write to complete. Nonblocking I/O is possible by using the fcntl(2)
F_SETFL
operation to enable the O_NONBLOCK
open file status flag.
[…]
Pipe capacity
A pipe has a limited capacity. If the pipe is full, then a write(2)
will block or fail, depending on whether the O_NONBLOCK
flag is set (see below). Different implementations have different limits for the pipe capacity. Applications should not rely on a particular capacity: an application should be designed so that a reading process consumes data as soon as it is available, so that a writing process does not remain blocked.
In Linux versions before 2.6.11, the capacity of a pipe was the same as the system page size (e.g., 4096 bytes on i386). Since Linux 2.6.11, the pipe capacity is 16 pages (i.e., 65,536 bytes in a system with a page size of 4096 bytes). Since Linux 2.6.35, the default pipe capacity is 16 pages, but the capacity can be queried and set using the fcntl(2)
F_GETPIPE_SZ
and F_SETPIPE_SZ
operations. See fcntl(2)
for more information.
That is why the multiprocessing
Python library documentation recommends to make a consumer process empty each Queue
object with Queue.get
calls before its feeder threads are joined in producer processes (implicitly with garbage collection or explicitly with Queue.join_thread
calls):
Joining processes that use queues
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread
method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.
An example which will deadlock is the following:
from multiprocessing import Process, Queue
def f(q):
q.put('X' * 1000000)
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
A fix here would be to swap the last two lines (or simply remove the p.join()
line).
In some applications, a consumer process may not know how many items have been added to a queue by producer processes. In this situation, a reliable way to empty the queue is to make each producer process add a sentinel item when it is done and make the consumer process remove items (regular and sentinel items) until it has removed as many sentinel items as there are producer processes:
import multiprocessing
def f(q, e):
while True:
q.put('X' * 1000000) # block the feeder thread (size > pipe capacity)
if e.is_set():
break
q.put(None) # add a sentinel item
if __name__ == '__main__':
start_count = 5
stop_count = 0
q = multiprocessing.Queue()
e = multiprocessing.Event()
for _ in range(start_count):
multiprocessing.Process(target=f, args=(q, e)).start()
e.set() # stop producer processes
while stop_count < start_count:
if q.get() is None: # empty the queue
stop_count += 1 # count the sentinel items removed
This solution uses blocking Queue.get
calls to empty the queue. This guarantees that all items have been added to the queue and removed.
@DanH’s solution uses non-blocking Queue.get_nowait
calls to empty the queue. The problem with that solution is that producer processes can still add items to the queue after the consumer process has emptied the queue, which will create a deadlock (the consumer process will wait for the producer processes to terminate, each producer process will wait for its feeder thread to terminate, the feeder thread of each producer process will wait for the consumer process to remove the items added to the queue):
import multiprocessing.queues
def f(q):
q.put('X' * 1000000) # block the feeder thread (size > pipe capacity)
if __name__ == '__main__':
q = multiprocessing.Queue()
p = multiprocessing.Process(target=f, args=(q,))
p.start()
try:
while True:
q.get_nowait()
except multiprocessing.queues.Empty:
pass # reached before the producer process adds the item to the queue
p.join() # deadlock
Or newly created producer processes can fail to deserialise the Process
object of the consumer process if the synchronisation resources of the queue that comes with it as an attribute are garbage collected before, raising a FileNotFoundError
:
import multiprocessing.queues
def f(q):
q.put('X' * 1000000)
if __name__ == '__main__':
q = multiprocessing.Queue()
multiprocessing.Process(target=f, args=(q,)).start()
try:
while True:
q.get_nowait()
except multiprocessing.queues.Empty:
pass # reached before the producer process deserialises the Process
Standard error:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
File "/usr/local/Cellar/python@3.9/3.9.12/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory