2

Can the "standard" subprocess pipeline technique (e.g. http://docs.python.org/2/library/subprocess.html#replacing-shell-pipeline) be "upgraded" to two pipelines?

# How about
p1 = Popen(["cmd1"], stdout=PIPE, stderr=PIPE)
p2 = Popen(["cmd2"], stdin=p1.stdout)
p3 = Popen(["cmd3"], stdin=p1.stderr)
p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
p1.stderr.close()
#p2.communicate()  # or p3.communicate()?

OK, it's actually a different use case, but the closest starting point seems to be the pipeline example. By the way, how does p2.communicate() in a "normal" pipeline drive p1? Here's the normal pipeline for reference:

# From Python docs
output=`dmesg | grep hda`
# becomes
p1 = Popen(["dmesg"], stdout=PIPE)
p2 = Popen(["grep", "hda"], stdin=p1.stdout, stdout=PIPE)
p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
output = p2.communicate()[0]

I guess I'm ultimately interested in what kind of "process graphs" (or maybe just trees?) can communicate() support, but we'll leave the general case for another day.

Update: Here's the baseline functionality. Without communicate(), create 2 threads reading from p1.stdout and p2.stdout. In the main process, inject input via p1.stdin.write(). The question is whether we can drive a 1-source, 2-sink graph using just communicate()

dan3
  • 2,528
  • 22
  • 20
  • related: [How do I write stderr to a file while using “tee” with a pipe?](http://stackoverflow.com/q/692000/4279) – jfs Oct 28 '13 at 09:39
  • Thanks... but I'm looking to nail down communicate() behaviour in Python – dan3 Oct 28 '13 at 10:24
  • here's a reverse example: [output from several subprocesses is collected via a single pipe in a single thread](http://stackoverflow.com/a/9745864/4279) i.e., `n-source -> 1-sink` – jfs Nov 15 '13 at 02:19

2 Answers2

2

You could use bash's process substitution:

from subprocess import check_call

check_call("cmd1 > >(cmd2) 2> >(cmd3)", shell=True, executable="/bin/bash")

It redirects cmd1's stdout to cmd2 and cmd1's stderr to cmd3.

If you don't want to use bash then the code in your question should work as is e.g.:

#!/usr/bin/env python
import sys
from subprocess import Popen, PIPE
from textwrap import dedent

# generate some output on stdout/stderr
source = Popen([sys.executable, "-c", dedent("""
    from __future__ import print_function
    import sys
    from itertools import cycle
    from string import ascii_lowercase

    for i, c in enumerate(cycle(ascii_lowercase)):
        print(c)
        print(i, file=sys.stderr)
""")], stdout=PIPE, stderr=PIPE)

# convert input to upper case
sink = Popen([sys.executable, "-c", dedent("""
    import sys

    for line in sys.stdin:
        sys.stdout.write(line.upper())
""")], stdin=source.stdout)
source.stdout.close() # allow source to receive SIGPIPE if sink exits

# square input
sink_stderr = Popen([sys.executable, "-c", dedent("""
   import sys

   for line in sys.stdin:
       print(int(line)**2)
""")], stdin=source.stderr)
source.stderr.close() # allow source to receive SIGPIPE if sink_stderr exits

sink.communicate()
sink_stderr.communicate()
source.wait()
jfs
  • 399,953
  • 195
  • 994
  • 1,670
  • Won't the call `sink.communicate()` deadlock if `sink_stderr` fills up the underlying OS pipes? That was the whole point of the `communicate()` function -- it opens up 2 threads, if necessary, to read both `stdout` and `stderr`, avoiding blockage – dan3 Oct 28 '13 at 09:52
  • @dan3: `.communicate()` doesn't do anything interesting unless `PIPE` is used (and it is *not* used for `sink`, `sink_stderr` processes). You can replace it with a simple `.wait()` call here. I've used `.communicate()` for continuity with the original pipeline recipe and to support a more general case. You could use `Thread(target=sink_stderr.communicate, daemon=True).start()` *if* you use `stdout=PIPE` or `stderr=PIPE` for `sink_stderr` otherwise it is not necessary. – jfs Oct 28 '13 at 23:12
  • How about if `sink.communicate()` (which drives `source`) causes the `PIPE` between `source.stderr` and `sink_stderr.stdin` (which is not being read yet) to fill up? That will block `sink.communicate()`. – dan3 Oct 29 '13 at 05:35
  • There's also a typo in your answer: `source.stdout.close()` should refer to `sink` instead. – dan3 Oct 29 '13 at 05:35
  • @dan3: 1. `source.stdout.close()` is not a typo. Look at the original pipeline recipe. Make sure you understand why `p1.stdout.close()` is used 2. `sink = Popen(...)` and `sink_stderr = Popen(...)` calls use the redirection at a file descriptor level (at least on POSIX). There is no `PIPE` between `sink.stdin` and `source.stderr`. `stdin=PIPE` and `stdin=some_file_object` are completely different cases. As I've said, you can use even mere `sink.wait()`, `sink_stderr.wait()` calls instead of `.communicate()` in this case. – jfs Oct 29 '13 at 18:55
  • OK. I looked in `/proc/.../fd`. There are OS pipes between (source.stdout, sink_stdout.stdin) and (source.stderr, sink_stderr.stdin) (BTW I never said anything about `sink.stdin`, rather `sink_stderr.stdin`). However, as all processes run independently as soon as they are spawned, there is no need for communicate(). The problem arises when there are pipes to/from the main python process -- that's when you need `communicate()`. – dan3 Oct 30 '13 at 05:37
  • @dan3: you don't need `.communicate()` in this case (`.wait()` is enough). Provide a standalone code example if you think otherwise. The OS pipes you see are due to `source` is created using `stdout=PIPE, stderr=PIPE` (and some utility pipes that `subprocess` may use internally). `sink` sees something like this: [`os.dup2(source.stdout.fileno(), 0)`](http://goo.gl/4H2gp9) -- it redirects `source`'s stdout to `sink`'s stdin. As long as `sink` subprocess reads its stdin, nothing blocks.(`sink.communicate()` in another thread won't help `sink` to read its stdin: `sink.stdin is None`). – jfs Nov 05 '13 at 17:14
  • @dan3: btw, `sink` and `sink_stderr` are symmetrical in this case. Everything I've said about `sink` applies to `sink_stderr` if you replace `source.stdout` by `source.stderr`. It is obvious if you look at the command in the `check_call(.., shell=True)` call. – jfs Nov 05 '13 at 17:15
  • Of course, that's obvious. The crucial part was that .wait() is enough. I was ultimately interested in the general case of subprocess() graphs with multiple sinks, some of which communicate with the main process via std*=PIPE. But discussing this simple example ironed out some confusion. – dan3 Nov 05 '13 at 17:31
0

The solution here is to create a couple of background threads which read the output from one process and then write that into the inputs of several processes:

targets = [...] # list of processes as returned by Popen()
while True:
    line = p1.readline()
    if line is None: break
    for p in targets:
        p.stdin.write(line)
Aaron Digulla
  • 321,842
  • 108
  • 597
  • 820
  • I could have two threads reading from p2 and p3 (in the example in my question) and simply inject input via p1.stdin.write(). I'm not looking for something even *lower*-tech :) – dan3 Oct 28 '13 at 18:13