2

I want to spawn multiple subprocesses and run them in parallel. I have a function which looks mostly like this:

def stream_command(command):
    proc = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
    while proc.poll() is None:
        line = proc.stdout.readline()
        sys.stdout.write('[%s]: %s' % (command, line))
    return proc.poll()

I then can run multiple in parallel (roughly) with this:

def stream_commands(commands):
    threads = []
    for command in commands:
        target = lambda: stream_command(command)
        thread = Thread(target=target)
        thread.start()
        threads.append(thread)
    while True:
        if any(t.is_alive() for t in threads):
            continue
        else:
            break

The issue, however, is that in my stream_command function I am blocking on a call to proc.stdout.readline(). That means a couple of things: first of all, if the process never writes to stdout, that function will hang forever (even if the subprocess terminates, for example). Secondly, I can't separately respond to the stdout and stderr of the process (I would have to first blocking read to one, and then to the other... which would be very unlikely to work). What I would like to do is something akin to what I would write in node.js:

def stream_command(command):
    def on_stdout(line):
        sys.stdout.write('[%s]: %s' % (command, line))
    def on_stderr(line):
        sys.stdout.write('[%s (STDERR)]: %s' % (command, line))
    proc = asyncprocess.Popen(shlex.split(command),
            on_stdout=on_stdout,
            on_stderr=on_stderr
    )
    return proc.wait()

Where of course asyncprocess is some fictitious process module that lets me start subprocesses and pass handler functions for stdout and stderr.

So, is there anything akin to the asyncprocess module I have above, or failing that, is there any simple way to respond asynchronously to the events of a subprocess in python?

By the way, I should note that I'm using python 2.7. There seems to be some stuff for python3 via the asyncio library, but unfortunately that doesn't work here, AFAIK.

limp_chimp
  • 13,475
  • 17
  • 66
  • 105
  • Do you want to process the output at all? If not, don't redirect stdout/err to pipes. – tdelaney Mar 04 '15 at 17:19
  • Yes, I do: as you can see, I want to prepend each line with the command the line was produced by, or with that command plus `(STDERR)` if the line is from stderr. And in general, I want to support arbitrary processing of the lines, in real-time (the actual `stream_command` function I'm writing takes a handler function to process the lines however the user desires) – limp_chimp Mar 04 '15 at 17:20
  • `proc.stdout.readline()` returns an empty string when the program terminates, so you don't have a problem. Its more common to do `for line in proc.stdout` than the `while proc.poll()` thing, but both work. But your existing code works. – tdelaney Mar 04 '15 at 17:32
  • ... except it doesn't work, for the reasons I describe. Perhaps I don't need to worry about `readline()` not returning, but the issue of separately and asynchronously responding to `stdout` and `stderr` remains. – limp_chimp Mar 04 '15 at 17:36
  • Oh, you were setting `stderr=subprocess.STDOUT` so I was confused. – tdelaney Mar 04 '15 at 17:45
  • you could pass file-like objects to [`teed_call()` function](http://stackoverflow.com/a/4985080/4279). Put into their `.write()` methods whatever you want to put into `on_stdX()` functions. – jfs Mar 05 '15 at 12:02

1 Answers1

0

You can do this with a thread per data stream. Assuming you want stream_commands to block until all of the commands complete, you can do this:

stdout_lock = threading.Lock()

def pipe_to_stdout(preamble, pipe):
    for line in pipe:
        with stdout_lock:
            sys.stdout.write(preamble + line)

def stream_commands(commands):
    threads = []
    procs = []
    try:
        for command in commands:
            proc = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE)
            procs.append(proc)
            out_thread = Thread(target=target, args=('[stdout]: ', proc.stdout)
            err_thread = Thread(target=target, args=('[stderr]: ', proc.stderr)
            out_thread.start()
            err_thread.start()
            threads.append(out_thread)
            threads.append(err_thread)
    finally:
        for proc in procs:
            proc.wait()
        for thread in threads:
            thread.join()
tdelaney
  • 73,364
  • 6
  • 83
  • 116
  • Unfortunately that seems to block until the subprocess has finished before writing anything... :( – limp_chimp Mar 04 '15 at 18:37
  • Yeah, that's because subprocess pipes aren't pseudo-terminals. You can get around that by using `ptry.openpty` for the pipes or perhaps even look at the `pexpect` module. – tdelaney Mar 04 '15 at 18:41
  • OK but that still really doesn't answer my question, sorry. It's an important requirement that I can view the output in real-time (otherwise I could simply use `proc.communicate()`). I *am* able to do this with the technique I put in my initial question; I'm simply unable to separately respond to stdout and stderr. – limp_chimp Mar 04 '15 at 18:45