In my application I'm using pipes from the multiprocessing module to communicate between python processes. Lately I've observed a weird behaviour depending on the size of data I'm sending through them. According to the python documentation these pipes are based on the connections and should behave in an asynchronous manner, yet sometimes they get stuck at sending. If I enable full duplex in each connection everything works fine, even though I'm not using the connections for both sending and listening. Can anyone explain this behaviour?
- 100 floats, full duplex disable
The code works, utilizing the asynchronousness. - 100 floats, full duplex enable
The example works fine as expected. - 10000 floats, full duplex disable
The execution is blocked forever even though it was fine with the smaller data. - 10000 floats, full duplex enable
Fine again.
Code (it's not my production code, it just illustrates what I mean):
from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid
PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000
def arg_passer(pipe_in, pipe_out, list_):
my_pid = getpid()
print "{}: Before send".format(my_pid)
pipe_out.send(list_)
print "{}: After send, before recv".format(my_pid)
buf = pipe_in.recv()
print "{}: After recv".format(my_pid)
if __name__ == "__main__":
pipes = [Pipe(False) for _ in range(PROC_NR)]
# pipes = [Pipe(True) for _ in range(PROC_NR)]
pipes_in = deque(p[0] for p in pipes)
pipes_out = deque(p[1] for p in pipes)
pipes_in.rotate(1)
pipes_out.rotate(-1)
data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
for foo in xrange(PROC_NR)]
for proc in processes:
proc.start()
for proc in processes:
proc.join()