I would like to open several subprocesses and read/write from their stdin/stdout when there is data available.
First try:
import subprocess, select, fcntl, os
p1 = subprocess.Popen("some command", stdout=subprocess.PIPE)
p2 = subprocess.Popen("another command", stdout=subprocess.PIPE)
def make_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
make_nonblocking(p1.stdout)
make_nonblocking(p2.stdout)
size = 10000
while True:
inputready, outputready, exceptready = select.select([ p1.stdout.fileno(), p2.stdout.fileno() ],[],[])
for fd in inputready:
if fd == p1.stdout.fileno():
data = p1.stdout.read(size)
print "p1 read %d" % (len(data))
elif fd == p2.stdout.fileno():
data = p2.stdout.read(size)
print "p2 read %d" % (len(data))
This kind-of-works. Making the file descriptors nonblocking makes it so that reads of less than the full size succeed, which is nice. Finding streams by fileno is ugly but works (would be better to have a dict). Error handling isn't quite right though, any command exiting causes "IOError: [Errno 32] Broken pipe" (not sure where: it is reported as being in one of the print statements, which is bogus).
Extending this to writes as well has a few problems. In theory writing PIPE_BUF bytes to a file descriptor returned as ready for writing from select should be ok. This also kind-of-works, but requires shuffling the data buffers in some way to allow writing different size blocks from what is read (or perhaps having a fixed size ring buffer and stopping reading if we are about to buffer more than that).
Is there a cleaner way of accomplishing this? Perhaps threads, or some AIO-like call to check how much can be read/written without blocking, or ....
Can anyone give a working example of reading from one subprocess and writing to another asynchronously?