10

In my application I'm using pipes from the multiprocessing module to communicate between python processes. Lately I've observed a weird behaviour depending on the size of data I'm sending through them. According to the python documentation these pipes are based on the connections and should behave in an asynchronous manner, yet sometimes they get stuck at sending. If I enable full duplex in each connection everything works fine, even though I'm not using the connections for both sending and listening. Can anyone explain this behaviour?

  1. 100 floats, full duplex disable
    The code works, utilizing the asynchronousness.
  2. 100 floats, full duplex enable
    The example works fine as expected.
  3. 10000 floats, full duplex disable
    The execution is blocked forever even though it was fine with the smaller data.
  4. 10000 floats, full duplex enable
    Fine again.

Code (it's not my production code, it just illustrates what I mean):

from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid

PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000


def arg_passer(pipe_in, pipe_out, list_):
    my_pid = getpid()
    print "{}: Before send".format(my_pid)
    pipe_out.send(list_)
    print "{}: After send, before recv".format(my_pid)
    buf = pipe_in.recv()
    print "{}: After recv".format(my_pid)


if __name__ == "__main__":
    pipes = [Pipe(False) for _ in range(PROC_NR)]
    # pipes = [Pipe(True) for _ in range(PROC_NR)]
    pipes_in = deque(p[0] for p in pipes)
    pipes_out = deque(p[1] for p in pipes)
    pipes_in.rotate(1)
    pipes_out.rotate(-1)

    data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
    processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
                 for foo in xrange(PROC_NR)]

    for proc in processes:
        proc.start()

    for proc in processes:
        proc.join()
Michal
  • 6,411
  • 6
  • 32
  • 45

1 Answers1

16

First of all, it's worth noting the implementation of the multiprocessing.Pipe class...

def Pipe(duplex=True):
    '''
    Returns pair of connection objects at either end of a pipe
    '''
    if duplex:
        s1, s2 = socket.socketpair()
        s1.setblocking(True)
        s2.setblocking(True)
        c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
        c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
        s1.close()
        s2.close()
    else:
        fd1, fd2 = os.pipe()
        c1 = _multiprocessing.Connection(fd1, writable=False)
        c2 = _multiprocessing.Connection(fd2, readable=False)

    return c1, c2

The difference being that half-duplex 'Pipes' use an anonymous pipe, but full-duplex 'Pipes' actually use a Unix domain socket, since anonymous pipes are unidirectional by nature.

I'm not sure what you mean by the term "asynchronous" in this context. If you mean "non-blocking I/O" then it's worth noting that both implementations use blocking I/O by default.


Secondly, it's worth noting the pickled size of the data you're trying to send...

>>> from numpy.random import randn
>>> from cPickle import dumps
>>> len(dumps(randn(100)))
2479
>>> len(dumps(randn(10000)))
237154

Thirdly, from the pipe(7) manpage...

Pipe Capacity

A pipe has a limited capacity. If the pipe is full, then a write(2) will block or fail, depending on whether the O_NONBLOCK flag is set (see below). Different implementations have different limits for the pipe capacity. Applications should not rely on a particular capacity: an application should be designed so that a reading process consumes data as soon as it is available, so that a writing process does not remain blocked.

In Linux versions before 2.6.11, the capacity of a pipe was the same as the system page size (e.g., 4096 bytes on i386). Since Linux 2.6.11, the pipe capacity is 65536 bytes.


So, in effect, you've created a deadlock where all the subprocesses have blocked on the pipe_out.send() call, and none of them can receive any data from the other processes, because you're sending all 237,154 bytes of data in one hit, which has filled the 65,536 byte buffer.

You might be tempted just to use the Unix domain socket version, but the only reason it works at present is that it has a larger buffer size than a pipe, and you'll find that solution will also fail if you increase the number of DATA_POINTS to 100,000.

The "quick n' dirty hack" solution is to break the data into smaller chunks for sending, but it's not good practice to rely on the buffers being a specific size.

A better solution would be to use non-blocking I/O on the pipe_out.send() call, although I'm not familiar enough with the multiprocessing module to determine the best way to achieve it using that module.

The pseudocode would be along the lines of...

while 1:
    if we have sent all data and received all data:
        break
    send as much data as we can without blocking
    receive as much data as we can without blocking
    if we didn't send or receive anything in this iteration:
        sleep for a bit so we don't waste CPU time
        continue

...or you can use the Python select module to avoid sleeping for longer than is necessary, but, again, integrating it with multiprocessing.Pipe might be tricky.

It's possible that the multiprocessing.Queue class does all this for you, but I've never used it before, so you'd have to do some experiments.

Aya
  • 39,884
  • 6
  • 55
  • 55