I was looking for an example code to iterate over process output incrementally as this process consumes its input from provided iterator (incrementally as well). Basically:
import string
import random
# That's what I consider a very useful function, though didn't
# find any existing implementations.
def process_line_reader(args, stdin_lines):
# args - command to run, same as subprocess.Popen
# stdin_lines - iterable with lines to send to process stdin
# returns - iterable with lines received from process stdout
pass
# Returns iterable over n random strings. n is assumed to be infinity if negative.
# Just an example of function that returns potentially unlimited number of lines.
def random_lines(n, M=8):
while 0 != n:
yield "".join(random.choice(string.letters) for _ in range(M))
if 0 < n:
n -= 1
# That's what I consider to be a very convenient use case for
# function proposed above.
def print_many_uniq_numbered_random_lines():
i = 0
for line in process_line_reader(["uniq", "-i"], random_lines(100500 * 9000)):
# Key idea here is that `process_line_reader` will feed random lines into
# `uniq` process stdin as lines are consumed from returned iterable.
print "#%i: %s" % (i, line)
i += 1
Some of solutions suggested here allow to do it with threads (but it's not always convenient) or with asyncio (which is not available in Python 2.x). Below is example of working implementation that allows to do it.
import subprocess
import os
import fcntl
import select
class nonblocking_io(object):
def __init__(self, f):
self._fd = -1
if type(f) is int:
self._fd = os.dup(f)
os.close(f)
elif type(f) is file:
self._fd = os.dup(f.fileno())
f.close()
else:
raise TypeError("Only accept file objects or interger file descriptors")
flag = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
return False
def fileno(self):
return self._fd
def close(self):
if 0 <= self._fd:
os.close(self._fd)
self._fd = -1
class nonblocking_line_writer(nonblocking_io):
def __init__(self, f, lines, autoclose=True, buffer_size=16*1024, encoding="utf-8", linesep=os.linesep):
super(nonblocking_line_writer, self).__init__(f)
self._lines = iter(lines)
self._lines_ended = False
self._autoclose = autoclose
self._buffer_size = buffer_size
self._buffer_offset = 0
self._buffer = bytearray()
self._encoding = encoding
self._linesep = bytearray(linesep, encoding)
# Returns False when `lines` iterable is exhausted and all pending data is written
def continue_writing(self):
while True:
if self._buffer_offset < len(self._buffer):
n = os.write(self._fd, self._buffer[self._buffer_offset:])
self._buffer_offset += n
if self._buffer_offset < len(self._buffer):
return True
if self._lines_ended:
if self._autoclose:
self.close()
return False
self._buffer[:] = []
self._buffer_offset = 0
while len(self._buffer) < self._buffer_size:
line = next(self._lines, None)
if line is None:
self._lines_ended = True
break
self._buffer.extend(bytearray(line, self._encoding))
self._buffer.extend(self._linesep)
class nonblocking_line_reader(nonblocking_io):
def __init__(self, f, autoclose=True, buffer_size=16*1024, encoding="utf-8"):
super(nonblocking_line_reader, self).__init__(f)
self._autoclose = autoclose
self._buffer_size = buffer_size
self._encoding = encoding
self._file_ended = False
self._line_part = ""
# Returns (lines, more) tuple, where lines is iterable with lines read and more will
# be set to False after EOF.
def continue_reading(self):
lines = []
while not self._file_ended:
data = os.read(self._fd, self._buffer_size)
if 0 == len(data):
self._file_ended = True
if self._autoclose:
self.close()
if 0 < len(self._line_part):
lines.append(self._line_part.decode(self._encoding))
self._line_part = ""
break
for line in data.splitlines(True):
self._line_part += line
if self._line_part.endswith(("\n", "\r")):
lines.append(self._line_part.decode(self._encoding).rstrip("\n\r"))
self._line_part = ""
if len(data) < self._buffer_size:
break
return (lines, not self._file_ended)
class process_line_reader(object):
def __init__(self, args, stdin_lines):
self._p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self._reader = nonblocking_line_reader(self._p.stdout)
self._writer = nonblocking_line_writer(self._p.stdin, stdin_lines)
self._iterator = self._communicate()
def __iter__(self):
return self._iterator
def __enter__(self):
return self._iterator
def __exit__(self, type, value, traceback):
self.close()
return False
def _communicate(self):
read_set = [self._reader]
write_set = [self._writer]
while read_set or write_set:
try:
rlist, wlist, xlist = select.select(read_set, write_set, [])
except select.error, e:
if e.args[0] == errno.EINTR:
continue
raise
if self._reader in rlist:
stdout_lines, more = self._reader.continue_reading()
for line in stdout_lines:
yield line
if not more:
read_set.remove(self._reader)
if self._writer in wlist:
if not self._writer.continue_writing():
write_set.remove(self._writer)
self.close()
def lines(self):
return self._iterator
def close(self):
if self._iterator is not None:
self._reader.close()
self._writer.close()
self._p.wait()
self._iterator = None