10

I am trying to use the subprocess module in Python to communicate with a process that reads standard input and writes standard output in a streaming fashion. I want to have the subprocess read lines from an iterator that produces the input, and then read output lines from the subprocess. There may not be a one-to-one correspondence between input and output lines. How can I feed a subprocess from an arbitrary iterator that returns strings?

Here is some example code that gives a simple test case, and some methods I have tried that don't work for some reason or other:

#!/usr/bin/python
from subprocess import *
# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))

# I thought that stdin could be any iterable, but it actually wants a
# filehandle, so this fails with an error.
subproc = Popen("cat", stdin=input_iterator, stdout=PIPE)

# This works, but it first sends *all* the input at once, then returns
# *all* the output as a string, rather than giving me an iterator over
# the output. This uses up all my memory, because the input is several
# hundred million lines.
subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
output, error = subproc.communicate("".join(input_iterator))
output_lines = output.split("\n")

So how can I have my subprocess read from an iterator line by line while I read from its stdout line by line?

Ryan C. Thompson
  • 40,856
  • 28
  • 97
  • 159
  • how are you having your script run in the background? Or are you just not doing this? (I thought you were from the wording of your Q "I am trying to use the subprocess module in Python to communicate with a process that reads standard input and writes standard output in a streaming fashion." – Charlie Parker Feb 24 '19 at 19:07

4 Answers4

5

The easy way seems to be to fork and feed the input handle from the child process. Can anyone elaborate on any possible downsides of doing this? Or are there python modules that make it easier and safer?

#!/usr/bin/python
from subprocess import *
import os

def fork_and_input(input, handle):
    """Send input to handle in a child process."""
    # Make sure input is iterable before forking
    input = iter(input)
    if os.fork():
        # Parent
        handle.close()
    else:
        # Child
        try:
            handle.writelines(input)
            handle.close()
        # An IOError here means some *other* part of the program
        # crashed, so don't complain here.
        except IOError:
            pass
        os._exit()

# A really big iterator
input_iterator = ("hello %s\n" % x for x in xrange(100000000))

subproc = Popen("cat", stdin=PIPE, stdout=PIPE)
fork_and_input(input_iterator, subproc.stdin)

for line in subproc.stdout:
    print line,
Ryan C. Thompson
  • 40,856
  • 28
  • 97
  • 159
  • 1
    If you user `exit()` in the child process, `SystemExit` is raised. Should instead use [`os._exit(0)`](https://docs.python.org/2/library/os.html#os._exit) – crlb Jul 23 '15 at 14:37
  • 1
    [use `Thread()` instead of `os.fork()`](http://stackoverflow.com/a/32331150/4279) for portability and to avoid various hard to debug issues. Here's an example of possible issues with `os.fork()`: [Locks in the standard library should be sanitized on fork](http://bugs.python.org/issue6721) – jfs Jan 03 '16 at 02:45
  • with the command I'm running opening the `Popen` blocks my whole script :( – Charlie Parker Feb 24 '19 at 18:53
  • @CharlieParker I'm not sure how to help you. Simply running `Popen` shouldn't block anything, since you haven't done any I/O with the subprocess yet. – Ryan C. Thompson Feb 24 '19 at 20:15
  • @RyanThompson actually, I discovered that calling `repr` on the output of my command was the issue. I'm not doing that anymore. Thanks! – Charlie Parker Feb 24 '19 at 20:56
4

To feed a subprocess's standard input from a Python iterator:

#!/usr/bin/env python3 
from subprocess import Popen, PIPE

with Popen("sink", stdin=PIPE, bufsize=-1) as process:
    for chunk in input_iterator:
        process.stdin.write(chunk)

If you want to read the output at the same time then you need threads or async.io:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def writelines(writer, lines):
    # NOTE: can't use writer.writelines(lines) here because it tries to write
    # all at once
    with closing(writer):
        for line in lines:
            writer.write(line)
            await writer.drain()

async def main():
    input_iterator = (b"hello %d\n" % x for x in range(100000000))
    process = await asyncio.create_subprocess_exec("cat", stdin=PIPE, stdout=PIPE)
    asyncio.ensure_future(writelines(process.stdin, input_iterator))
    async for line in process.stdout:
        sys.stdout.buffer.write(line)
    return await process.wait()

if sys.platform == 'win32':
    loop = asyncio.ProactorEventLoop()  # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
with closing(loop):
    sys.exit(loop.run_until_complete(main()))
Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670
0

Follow this recipe It's an add-on to subprocess which supports asyncronous I/O. This still requires that your subprocess respond to each input line or group of lines with a portion of its output, though.

Nicola Musatti
  • 17,834
  • 2
  • 46
  • 55
  • 1
    I can't guarantee that the program will produce output for every line of input. In fact, it probably won't. – Ryan C. Thompson Aug 01 '11 at 13:25
  • Sorry, I wasn't precise: what I meant was that your main process should be able to feed enough input to your subprocess for it to generate some output, read this output, feed the subprocess some more input, and so on in a loop. If this the case, the recipe pointed to by my link may help you. The main point is that your subprocess should be able to start generating output before it sees all the input. – Nicola Musatti Aug 01 '11 at 13:52
  • Hmm. There is potentially a sort step in my pipeline (depending on options), so it probably will not generate most of the output until it has received all the input. – Ryan C. Thompson Aug 01 '11 at 15:49
0

There is https://github.com/uktrade/iterable-subprocess (full disclosure: created by me) that can do this. For example:

from iterable_subprocess import iterable_subprocess

input_iterator = (("hello %s\n" % x).encode("utf-8") for x in range(100000000))

with iterable_subprocess(['cat'], input_iterator) as output:
    for chunk in output:
        print(chunk)

Although this won't output lines of strings, but chunks of bytes, not necessarily split into lines. To make an iterable of lines, you can integrate a variant of the answer at https://stackoverflow.com/a/70639580/1319998

import io
from iterable_subprocess import iterable_subprocess

input_iterator = (("hello %s\n" % x).encode() for x in range(100000000))

class FileLikeObject(io.IOBase):
    def __init__(self, it):
        self.it = iter(it)
    def readable(self):
        return True
    def read(self, _):
        return next(self.it, b'')

with iterable_subprocess(['cat'], input_iterator) as output:
    for line in io.TextIOWrapper(FileLikeObject(output), newline="", encoding="utf-8"):
        print(line)
Michal Charemza
  • 25,940
  • 14
  • 98
  • 165