2

This is a followup question to @Omry Yadan's Dec/9/2018 answer at How do subprocess.Popen pipes work in Python?. I need to create a three program pipeline and collect stderr and return codes from all three programs. My current solution (below), based on that Dec/9/2018 answer, hangs. The equivalent command pasted in the shell finishes quickly.

The amount of data being piped from stdout to stdin is in the Mbyte realm. The final stdout, as well as the three stderrs, are expected to be much smaller.

#!/usr/bin/env python3

cmd1 = ["gzip", "-dc", "some_file.gz"]
cmd2 = ["filtering_program", "some", "arguments"]
cmd3 = ["collection_program", "some", "arguments"]

p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE)
p2 = Popen(cmd1, stdin=p1.stdout, stdout=PIPE, stderr=PIPE)]
p3 = Popen(cmd1, stdin=p2.stdout, stdout=PIPE, stderr=PIPE)]
(outcome_stdout, outcome_stderr) = p3.communicate()
p1.wait()
p2.wait()

2 Answers2

1

Choose your own adventure / tl;dr

  • Is your app using an async framework? See Async.
  • Can you afford to throw away stderr in the non-final stages? See Ignore intermediate stderr.
  • Can you afford to take a performance hit and switch to sequential instead of parallel execution? See Workaround intermezzo.
  • Looking for a performant general solution that retains intermediate stderr and executes the pipeline in parallel? See either Threads, or else Temporary files (if for some reason you'd prefer not to make your program multi-threaded).
  • Interested in learning more about how I/O multiplexing works under the hood? See Select. I'm emphatically not recommending to use this in practice, the performance tends to be worse for reasons discussed below. But it is instructive, and as the last alternative, it also contains a discussion of some additional bells and whistles that it might make sense to retrofit on most of the other solutions.
  • Bonus: Benchmarks at the end.

A reproducible example

First of all, let's adapt the code you provided into a reproducible example, so that we can all try this at home :) Let's try to implement the equivalent of seq 2000000 | tee /dev/stderr | head -n 1000000 | wc -l in shell syntax. Translating the original example:

from subprocess import Popen, PIPE

n = 2_000_000
cmds = [["seq", str(n)], ["tee", "/dev/stderr"], ["head", "-n", str(n // 2)], ["wc", "-l"]]
procs = []
prev_stdout = None
for cmd in cmds:
    procs.append(Popen(cmd, stdin=prev_stdout, stdout=PIPE, stderr=PIPE))
    prev_stdout = procs[-1].stdout
out, err = procs[-1].communicate()
for p in procs[:-1]:
    p.wait()
print(out)

The challenge here is that tee, in addition to sending its stdin onwards to its stdout, also copies it to its stderr, which no one is reading from. Once the limited size buffer provided by the OS fills up, tee stops accepting input (it blocks on trying to write to its stderr, whose buffer is full) and the whole pipeline deadlocks, hanging on the call to .communicate().

(Why I threw in a head in there will become apparent shortly.)

Ignore intermediate stderr if you don't need it

Based on how you phrased the question, I'm not quite sure you actually care about the stderr of any but the final process. If not, the simplest solution really is to just explicitly ignore it by sending it to /dev/null:

from subprocess import Popen, PIPE, DEVNULL

n = 2_000_000
cmds = [["seq", str(n)], ["tee", "/dev/stderr"], ["head", "-n", str(n // 2)], ["wc", "-l"]]
procs = []
prev_stdout = None
for cmd in cmds:

    # This bit changed:
    stderr = PIPE if cmd is cmds[-1] else DEVNULL
    procs.append(Popen(cmd, stdin=prev_stdout, stdout=PIPE, stderr=stderr))

    prev_stdout = procs[-1].stdout
out, err = procs[-1].communicate()
for proc in procs[:-1]:
    proc.wait()
print(out)

But this deadlocks too, albeit elsewhere -- on the call to .wait() this time. Why? As stipulated in the subprocess docs, we need to close all stdout read handles on the Python side but the final one, in order to allow a possible SIGPIPE to propagate. This is why we added a head command to the pipeline: it only reads the first half of the input and then closes its stdin, propagating a SIGPIPE upstream, forcing us to account for this possibility:

from subprocess import Popen, PIPE, DEVNULL

n = 2_000_000
cmds = [["seq", str(n)], ["tee", "/dev/stderr"], ["head", "-n", str(n // 2)], ["wc", "-l"]]
procs = []
prev_stdout = None
for cmd in cmds:
    stderr = PIPE if cmd is cmds[-1] else DEVNULL
    procs.append(Popen(cmd, stdin=prev_stdout, stdout=PIPE, stderr=stderr))

    # This bit is new:
    if prev_stdout is not None:
        prev_stdout.close()

    prev_stdout = procs[-1].stdout
out, err = procs[-1].communicate()
for proc in procs[:-1]:
    proc.wait()
print(out)

This successfully avoids the deadlock and should print out b'1000000\n' for confirmation (seq outputted 2M lines, head let through only the first half of that, which wc -l counted), but it's not a fully general solution because chances are you do care about those other stderrs. Or at least some of us do.

Temporary files

The simplest solution if you want to keep the non-final stderrs (as measured by how much you need to know about various fancy programming techniques) is arguably to write them to temporary files instead of PIPE. In this case, the OS takes care of draining the buffers for you, dumping their contents to the files as needed. You could just use open() on a path of your choice, or use the tempfile module for a bit more safety.

Two options there: you can either use the high-level tempfile.TemporaryFile, which cleans up after itself, but it seems the cleanup can be quite slow depending on where the temp directory resides...

# tmpfile.py
from subprocess import Popen, PIPE
import tempfile

n = 2_000_000
cmds = [["seq", str(n)], ["tee", "/dev/stderr"], ["head", "-n", str(n // 2)], ["wc", "-l"]]
stderrs = [tempfile.TemporaryFile() for _ in range(len(cmds) - 1)] + [PIPE]
procs = []
prev_stdout = None
for cmd, stderr in zip(cmds, stderrs):
    procs.append(Popen(cmd, stdin=prev_stdout, stdout=PIPE, stderr=stderr))
    if prev_stdout is not None:
        prev_stdout.close()
    prev_stdout = procs[-1].stdout
out, err = procs[-1].communicate()
for proc, stderr in zip(procs[:-1], stderrs[:-1]):
    proc.wait()
    stderr.seek(0)
print(stderrs[1].read().count(b"\n"), out)

... or you can use the low-level tempfile.mkstemp, which leaves the cleanup up to you, so you can skip it for the time being to get a potential speedup, but this litters your temp directory with potentially large files you might need to remember to clean up at some later point:

# mkstemp.py
from subprocess import Popen, PIPE
import tempfile

n = 2_000_000
cmds = [["seq", str(n)], ["tee", "/dev/stderr"], ["head", "-n", str(n // 2)], ["wc", "-l"]]

# This bit changed:
stderrs = [open(tempfile.mkstemp()[0], "rb+") for _ in range(len(cmds) - 1)] + [PIPE]

procs = []
prev_stdout = None
for cmd, stderr in zip(cmds, stderrs):
    procs.append(Popen(cmd, stdin=prev_stdout, stdout=PIPE, stderr=stderr))
    if prev_stdout is not None:
        prev_stdout.close()
    prev_stdout = procs[-1].stdout
out, err = procs[-1].communicate()
for proc, stderr in zip(procs[:-1], stderrs[:-1]):
    proc.wait()
    stderr.seek(0)
print(stderrs[1].read().count(b"\n"), out)

In both cases, the output will be something like 1001608 b'1000000\n', the first number (= the number of lines in tee's stderr) being a bit higher as tee manages to copy a bit more from seq's output to its stderr before it exits due to the SIGPIPE from head.

Workaround intermezzo: sequential instead of parallel execution

I'm assuming one of your requirements is that the pipeline should run like a "real" shell pipeline, i.e. all processes should execute in parallel. This is often desirable when the input is large and/or the individual stages do complex processing, so there's a tangible benefit to allowing the processes later on in the pipeline to get started on the head of the data while the former stages are still grinding on the tail.

That being said, in situations where this is not the case, or where the slowdown penalty is acceptable, an easy workaround is to switch from parallel execution of subprocesses using non-blocking subprocess.Popen to sequential execution using blocking subprocess.run, as already suggested by one of the comments. On the plus side, this gives you nice additional benefits, like cleaning up the subprocesses upon exceptions, which you otherwise need to implement yourself (see Select for details).

# run.py
from subprocess import run

n = 2_000_000
cmds = [["seq", str(n)], ["tee", "/dev/stderr"], ["head", "-n", str(n // 2)], ["wc", "-l"]]
procs = []
prev_stdout = None
for cmd in cmds:
    procs.append(run(cmd, capture_output=True, input=prev_stdout))
    prev_stdout = procs[-1].stdout
print(procs[1].stderr.count(b"\n"), procs[-1].stdout)

Careful, in this case, procs does not hold subprocess.Popen instances (as above and below), but instances of subprocess.CompletedProcess, and procs[-1].stdout is the full actual output (bytes or str), rather than just a file handle. Popen just starts the process and gives control back to you; run waits for the process to finish and collects the output. That's why no .communicate() or .wait() is needed.

The output should be 2000000 b'1000000\n'. With the first number corresponding to the full count of lines outputted by seq, we see that under this approach, tee will always be able to copy all of its input to both stdout and stderr, even though head will subsequently only use the first half of it -- one of the inefficiencies of sequential execution.

Threads

While threads are often a way of adding deadlocks to otherwise perfectly working code, this time, it's the other way round! The gist is that we want to be able to call .communicate() on all Popen instances at the same time, so that they're all able to make progress together, reading from the various buffers as they fill up, all at once (or in rapid alternation). Threads let us do this:

# threads.py
from subprocess import Popen, PIPE
import threading

n = 2_000_000
cmds = [["seq", str(n)], ["tee", "/dev/stderr"], ["head", "-n", str(n // 2)], ["wc", "-l"]]
procs = []
prev_stdout = None
for cmd in cmds:
    procs.append(Popen(cmd, stdin=prev_stdout, stdout=PIPE, stderr=PIPE))
    if prev_stdout is not None:
        prev_stdout.close()
    prev_stdout = procs[-1].stdout

proc2out = {}
def communicate(proc):
    proc2out[proc] = proc.communicate()

threads = []
for proc in procs:
    threads.append(threading.Thread(target=communicate, args=(proc,)))
    threads[-1].start()
for thread in threads:
    thread.join()
print(proc2out[procs[1]][1].count(b"\n"), proc2out[procs[-1]][0])

Notice that we save the outputs to a proc2out dict keyed by the Popen instance, as the order in which the threads will finish is not guaranteed. Notice also we still need to do the proc.stdout.close() dance.

(Actually, I noticed this example doesn't consistently deadlock if we remove prev_stdout.close(). Sometimes it does, sometimes it completes but gives the wrong answer, with both numbers being in the lower couple hundreds of thousands instead of the expected 1M+ and 1M. Apparently, what happens is basically a fork in the road at each stdout: part of the output gets sent down the pipeline, part is captured by Python via Popen.communicate and doesn't make it through. At any rate, adding a second tee /dev/stderr after the first one seems to result in consistent deadlocks on every run again. The "wrong answer" outcome can be replicated even without threads, if you remove prev_stdout.close() and just sequentially call proc.communicate() on all processes.)

Anyway, the solution using threads is pretty fast, avoids writing anything to the filesystem, and you should probably use it. Unless you're using async.

Async

To be clear: I'm not suggesting you drag async into an otherwise non-async program just to solve this issue. Async opens up a whole different can of worms and it's not worth the added complexity if you don't need it. On the other hand, if you already are using it, then you probably realized in the previous section that running multiple I/O calls (i.e., .communicate() in our case) concurrently is exactly one of the things async was designed to do.

So in an async app, don't use subprocess + threads, but asyncio.subprocess (if you're using the stdlib asyncio module as your async runtime, that is) and standard async machinery:

# async.py
import os
import asyncio
from asyncio.subprocess import PIPE, create_subprocess_exec

async def main():
    n = 2_000_000
    cmds = [["seq", str(n)], ["tee", "/dev/stderr"], ["head", "-n", str(n // 2)], ["wc", "-l"]]
    procs = []
    prev_stdout = None
    for cmd in cmds:
        if cmd is cmds[-1]:
            read, write = None, PIPE
        else:
            read, write = os.pipe()
        procs.append(await create_subprocess_exec(*cmd, stdin=prev_stdout,
                                                  stdout=write, stderr=PIPE))
        if write is not PIPE:
            # This close is new, it was previously being taken care of automatically
            # when passing stdout=PIPE instead of a manually created file descriptor.
            os.close(write)
        if prev_stdout is not None:
            # This close is what we had before, just written slightly differently
            # because prev_stdout is now a raw file descriptor, i.e. just an integer
            # without a .close() method.
            os.close(prev_stdout)
        prev_stdout = read
    out = await asyncio.gather(*[proc.communicate() for proc in procs])
    print(out[1][1].count(b"\n"), out[-1][0])

asyncio.run(main())

The main gotcha here is that we can't just use proc.stdout to connect the previous process's stdout to the current process's stdin, because reasons. So we can't rely on PIPE, and we have to instead create the pipe manually with os.pipe(), which adds another bit of book-keeping: it becomes our job to close the write end of the pipe after we've passed it to the current stage's stdout (with PIPE, this happens automatically behind the scenes). This is in addition to closing the read end of the previous pipe once we've passed it to the current stage's stdin (which we've already been doing). Details in the linked post.

Otherwise, apart from cosmetic differences like the need to wrap the code in a main async function, this is conceptually very similar to what the threading approach does. asyncio.gather runs multiple tasks/coroutines to completion concurrently, and returns their results in order, so we don't even need a proc2out dict to keep track of which output belongs to which process. A more verbose but also more flexible and nicely readable alternative would be to use an asyncio.TaskGroup to run the tasks concurrently.

The performance tends to be towards the middle of the pack (see Benchmarks at the end), which is another reason not to introduce async just because you need to run this type of pipeline, unless you're already using it. Also, even when using async, you might still benefit from just ignoring the stderrs you don't actually need (see above).

Select

Select is an I/O notification mechanism which allows making progress on reading from multiple streams at once by switching between them as output becomes available. You register with the kernel to be notified when that is the case. (Writing is also supported if needed.)

Select is what subprocess.Popen.communicate uses under the hood to write stdin and read both stdout and stderr, all at the same time, while avoiding deadlocks. So we need to adapt this approach to alternate between reading stdout and stderr streams of multiple processes (our entire pipeline), not just one.

Also, since this is the final example, we'll also add a few more bells and whistles to make the code more well-behaved. They're described in more detail below; when using one of the previous approaches, consider adding these as appropriate. Without further ado:

# selector.py
import contextlib
import os
from subprocess import Popen, PIPE
import selectors

n = 2_000_000
cmds = [["seq", str(int(n))], ["tee", "/dev/stderr"], ["head", "-n", str(n // 2)], ["wc", "-l"]]
procs = []
file2out = {}

selector = selectors.DefaultSelector()
with contextlib.ExitStack() as estack:
    estack.enter_context(selector)

    try:
        prev_stdout = None
        for cmd in cmds:
            proc = estack.enter_context(Popen(cmd, stdin=prev_stdout, stdout=PIPE, stderr=PIPE))
            procs.append(proc)
            file2out[proc.stderr] = b""
            selector.register(proc.stderr, selectors.EVENT_READ)
            if cmd is cmds[-1]:
                file2out[proc.stdout] = b""
                selector.register(proc.stdout, selectors.EVENT_READ)
            if prev_stdout is not None:
                prev_stdout.close()
            prev_stdout = proc.stdout

        while selector.get_map():
            for key, _ in selector.select():
                if not (data := os.read(key.fd, 32_768)):
                    selector.unregister(key.fileobj)
                    key.fileobj.close()
                else:
                    file2out[key.fileobj] += data

    except:
        for proc in procs:
            proc.kill()
        raise

print(file2out[procs[1].stderr].count(b"\n"), file2out[procs[-1].stdout])

A few observations:

  • First of all, note how the data is read. We're using os.read(key.fd, N) instead of key.fileobj.read(N), because the latter blocks unless at least N bytes are available or EOF is reached. Since this is synchronous, single-threaded code, this can lead to deadlock (if upstream can't write any more data to the stream that's being read unless progress is made on reading other streams first). Conversely, the former will just return partial data if no more is available at the moment.

    Why N = 32,768 bytes? I lifted this directly from what the subprocess module does. It happens to be half the OS pipe buffer size, at least on Linux.

  • A second important point related to the fact that this is synchronous, single-threaded code, is that the .get_map()/.select() loops become a bottleneck for ingesting all of the data incoming from multiple streams at once. Python can only juggle between them so fast. This is already apparent in the benchmarks for this small (albeit admittedly write-heavy) pipeline below, and quickly becomes a deal breaker if you add a few more tee /dev/stderrs to the mix.

    Why? Because there's no such bottleneck in a corresponding shell pipeline with stderrs redirected to individual files, so the performance degradation in comparison would be unacceptable. To remain competitive, the Python solution needs to enable I/O in parallel. In practice, this means either temp files (which is basically a straightforward translation of the shell solution) or threads (arguably cleaner, keeps the data in memory rather than roundtripping it through the file system).

  • The call to .communicate() is gone. Instead, we've basically reimplemented a custom communication flow for streams from multiple processes.

  • We still mustn't forget about prev_stdout.close()!

  • As in the threads example, we use a dict to store the results, as there's no ordering guarantee -- we keep switching back and forth between different streams as output becomes available.

  • If you also want to pass input via the first process's stdin, you'll need to register it with selectors.EVENT_WRITE and handle a possible BrokenPipeError when writing to it.

As to the bells and whistles, we wrap the pipeline in a try/except which makes sure to kill subprocesses upon unclean exit. Otherwise, they might keep running in the background, which is typically undesirable. This is directly inspired by what subprocess.run does and would make a sensible addition to the other solutions.

Note however that it doesn't cover exit by any signal other than those that Python already converts to exceptions by default, i.e. SIGINT (→ KeyboardInterrupt) and SIGPIPE (→ BrokenPipeError). The easiest way to cover e.g. SIGTERM as well would be to install a signal handler which converts SIGTERM exits to normal exits through raising a SystemExit via something like sys.exit(128 + signum).

If you can't live with converting a signal exit to a normal exit, you'll have to keep the subprocess handles somewhere accessible from the signal handler, terminate the processes explicitly, and signal.raise_signal(signum) once you're done.

And if you need to make sure subprocesses terminate promptly even when the parent receives an uncatchable signal like SIGKILL, there's prctl on Linux and Job objects on Windows.

Finally, instead of explicitly calling .wait(), we use the Popen objects as context managers, which will take care of closing any remaining open file descriptors and waiting on the processes upon context exit. Since we want to be able to support an arbitrary number of stages in the pipeline, and the selector is also a context manager as it might have its own resources to clean up, we use a contextlib.ExitStack instead of typing them all out into the with statement, which would be inflexible and cumbersome.

Benchmarks

I ran a benchmark with hyperfine -N comparing all of the solutions which have a comment with a filename at the beginning. Those are the ones that are roughly functionally equivalent, i.e. they all capture tee's stderr. Here are the results on a Fedora 38 desktop (with temp dir on in-memory tmpfs):

Command Mean [ms] Min [ms] Max [ms] Relative
python3 tmpfile.py 31.5 ± 1.1 30.3 35.4 1.08 ± 0.05
python3 mkstemp.py 31.1 ± 1.3 29.9 36.7 1.06 ± 0.06
python3 run.py 91.3 ± 1.6 88.8 95.9 3.12 ± 0.12
python3 threads.py 29.2 ± 1.0 27.9 35.3 1.00
python3 async.py 51.6 ± 1.9 49.8 56.7 1.77 ± 0.09
python3 selector.py 151.3 ± 4.2 144.7 157.1 5.18 ± 0.23

And on an M1 MacBook Air (with temp dir on main SSD):

Command Mean [ms] Min [ms] Max [ms] Relative
python3 tmpfile.py 59.2 ± 2.4 56.0 70.0 1.38 ± 0.07
python3 mkstemp.py 57.2 ± 1.7 54.5 62.6 1.34 ± 0.05
python3 run.py 218.2 ± 1.9 215.5 220.8 5.11 ± 0.14
python3 threads.py 42.7 ± 1.1 40.6 45.2 1.00
python3 async.py 67.6 ± 2.1 64.6 78.7 1.58 ± 0.07
python3 selector.py 86.2 ± 2.5 83.4 95.2 2.02 ± 0.08

YMMV of course, and you should always run your own test in your particular situation if you care enough. But in general:

  • Just throwing threads at the problem (threads.py) is quick, easy and performant. Temporary files can be a compelling alternative if you want to avoid making your program multi-threaded for some reason.

  • However, be careful which temp file variant you pick: I also ran the benchmark on a Linux server (results not shown above) where tmpfile.py was consistently about twice as slow as mkstemp.py. I'm guessing that depending on the file system / storage device used for temporary storage (RBD in the server's case), the cleanup phase (which we just skip in the latter variant) can be quite a bit costlier.

  • With async.py, keep in mind that results will vary depending on the async framework/event loop implementation: the benchmark only covers the standard library's asyncio. Also, I was somewhat surprised to see it perform faster than selector.py. I'd thought both were doing basically the same thing (using select to synchronously alternate between reads to various streams), with async.py possibly having more overhead because it's a fully general framework with an event loop, as opposed to a hand-rolled case-specific solution.

    But no. Maybe asyncio applies some optimizations I didn't think of? Maybe it offloads the I/O switching to a C extension, so that it runs faster than our pure Python solution? Maybe it actually spawns threads under the hood to do the I/O in parallel?

dlukes
  • 1,313
  • 16
  • 27
0

Note the following issues that are documented. Also note that the pipe buffer either grows to (MacOS) is is limited to 65536 bytes by default (modern Linux)

Popen.wait

Note This will deadlock when using stdout=PIPE or stderr=PIPE and the child process generates enough output to a pipe such that it blocks waiting for the OS pipe buffer to accept more data. Use Popen.communicate() when using pipes to avoid that.

Popen.stderr

Warning Use communicate() rather than .stdin.write, .stdout.read or .stderr.read to avoid deadlocks due to any of the other OS pipe buffers filling up and blocking the child process.

gdahlm
  • 1,132
  • 7
  • 8
  • Thanks. I did see that note about potential deadlock, but it is not clear to me HOW to use Popen.communicate() to avoid deadlock. Should I be calling p1.communicate() and p2.communicate()? And in what order relative to the p*.wait() calls? – Rotwurg Nwossle Jul 10 '21 at 18:47
  • Are you aware of any better reference for subprocess than https://docs.python.org/3/library/subprocess.html? What I am trying to do seems like a natural use of the package, is trivial to run in the shell, thus ought to be have a simple python solution. Yet I can't figure out from those docs how to accomplish it. – Rotwurg Nwossle Jul 10 '21 at 18:52
  • It depends on what you are trying to accomplish as preferred style of development and reasons of trying to add concurrency will change the answer. I would be tempted to just use run() in series first unless you have a reason to add complexity. The issue with deadlocks and concurrency, especially when using the lower level Popen() are far more complex than I can answer with a few 100 chars. Here is a video that will help understand the pitfalls https://youtu.be/Bv25Dwe84g0 Personally, I would use _async for loops_ and because deadlocks are hard and the task is not truly parallel. – gdahlm Jul 10 '21 at 19:40
  • Thanks for that link. Indeed, I do plan to add concurrency but was hoping to get this working without it since I expected it would be easier to debug a non-concurrent version. I am using run() for other commands in my script but it deadlocks as soon as I try two piping commands together. Adding shell=true didn't help much — I believe the issue is the amount of data going from one process to the other. I hadn't considered using a series of single command run()s — I presume this would be with the piped data being completely held in python objects. That may be 'best' solution. Thanks! – Rotwurg Nwossle Jul 10 '21 at 21:44