20

I'm working on a fairly large project in Python that requires one of the compute-intensive background tasks to be offloaded to another core, so that the main service isn't slowed down. I've come across some apparently strange behaviour when using multiprocessing.Queue to communicate results from the worker process. Using the same queue for both a threading.Thread and a multiprocessing.Process for comparison purposes, the thread works just fine but the process fails to join after putting a large item in the queue. Observe:

import threading
import multiprocessing

class WorkerThread(threading.Thread):
    def __init__(self, queue, size):
        threading.Thread.__init__(self)
        self.queue = queue
        self.size = size

    def run(self):
        self.queue.put(range(size))


class WorkerProcess(multiprocessing.Process):
    def __init__(self, queue, size):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.size = size

    def run(self):
        self.queue.put(range(size))


if __name__ == "__main__":
    size = 100000
    queue = multiprocessing.Queue()

    worker_t = WorkerThread(queue, size)
    worker_p = WorkerProcess(queue, size)

    worker_t.start()
    worker_t.join()
    print 'thread results length:', len(queue.get())

    worker_p.start()
    worker_p.join()
    print 'process results length:', len(queue.get())

I've seen that this works fine for size = 10000, but hangs at worker_p.join() for size = 100000. Is there some inherent size limit to what multiprocessing.Process instances can put in a multiprocessing.Queue? Or am I making some obvious, fundamental mistake here?

For reference, I am using Python 2.6.5 on Ubuntu 10.04.

Brendan Wood
  • 6,220
  • 3
  • 30
  • 28
  • I have a same issue here. Using `Queue` to return model weights---a list of >100000 elements---not it is stuck. – chapter09 Jul 23 '20 at 22:09

4 Answers4

21

Seems the underlying pipe is full, so the feeder thread blocks on the write to the pipe (actually when trying to acquire the lock protecting the pipe from concurrent access).

Check this issue http://bugs.python.org/issue8237

Maksym Polshcha
  • 18,030
  • 8
  • 52
  • 77
  • 3
    Thanks, that's exactly the issue I'm experiencing, and dequeuing in the parent thread before joining seems to work fine. – Brendan Wood Apr 05 '12 at 15:42
  • 1
    thank you so much. just swap 2 lines: "worker_t.join() print 'thread results length:', len(queue.get())" – Chau Pham Sep 30 '16 at 11:22
  • what's the reason to switch these two lines? it seems that it is not reasonable to get something before the join method – drbombe Mar 26 '21 at 21:06
4

By default maxsize of Queue is infinite, but you have over-ridden that. In your case, worker_p is putting item in the queue, the queue should be freed up before calling join. Please refer below link for details. https://docs.python.org/2/library/multiprocessing.html#programming-guidelines

3

The answer to python multiprocessing: some functions do not return when they are complete (queue material too big) implements what you probably mean by "dequeuing" before joining" in a parallel execution of an arbitrary set of functions, whose return values get queued.

This therefore allows any size of stuff to get put into the queues, so that the limit you found does not get in the way.

Community
  • 1
  • 1
CPBL
  • 3,783
  • 4
  • 34
  • 44
1

You can only send 18 exabytes per multiprocessing.Queue message.

multiprocessing connections use an int prefixed string protocol, which uses 4 to 12 bytes to describe the length of the message.

Here's the code where the messages are sent:

https://github.com/python/cpython/blob/1815d8e64fd0bf9fc5fadc2af928a60e380a5c06/Lib/multiprocessing/connection.py#L397-L402

annotated here:

    n = len(buf)
    if n > 0x7fffffff:  # if the message length is longer than a signed 4 byte integer (2 gigabytes)
        pre_header = struct.pack("!i", -1)  # then send b'\xff\xff\xff\xff' to say this is a large message
        header = struct.pack("!Q", n)  # encode the message length as an 8 byte unsigned integer
        self._send(pre_header)
        self._send(header)
        self._send(buf)

see How big can a 64 bit unsigned integer be?

Thomas Grainger
  • 2,271
  • 27
  • 34