34

I'm testing subprocesses pipelines with python. I'm aware that I can do what the programs below do in python directly, but that's not the point. I just want to test the pipeline so I know how to use it.

My system is Linux Ubuntu 9.04 with default python 2.6.

I started with this documentation example.

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

That works, but since p1's stdin is not being redirected, I have to type stuff in the terminal to feed the pipe. When I type ^D closing stdin, I get the output I want.

However, I want to send data to the pipe using a python string variable. First I tried writing on stdin:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

Didn't work. I tried using p2.stdout.read() instead on last line, but it also blocks. I added p1.stdin.flush() and p1.stdin.close() but it didn't work either. I Then I moved to communicate:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

So that's still not it.

I noticed that running a single process (like p1 above, removing p2) works perfectly. And passing a file handle to p1 (stdin=open(...)) also works. So the problem is:

Is it possible to pass data to a pipeline of 2 or more subprocesses in python, without blocking? Why not?

I'm aware I could run a shell and run the pipeline in the shell, but that's not what I want.


UPDATE 1: Following Aaron Digulla's hint below I'm now trying to use threads to make it work.

First I've tried running p1.communicate on a thread.

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

Okay, didn't work. Tried other combinations like changing it to .write() and also p2.read(). Nothing. Now let's try the opposite approach:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

code ends up blocking somewhere. Either in the spawned thread, or in the main thread, or both. So it didn't work. If you know how to make it work it would make easier if you can provide working code. I'm trying here.


UPDATE 2

Paul Du Bois answered below with some information, so I did more tests. I've read entire subprocess.py module and got how it works. So I tried applying exactly that to code.

I'm on linux, but since I was testing with threads, my first approach was to replicate the exact windows threading code seen on subprocess.py's communicate() method, but for two processes instead of one. Here's the entire listing of what I tried:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

Well. It didn't work. Even after p1.stdin.close() was called, p2.stdout.read() still blocks.

Then I tried the posix code on subprocess.py:

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

Also blocks on select.select(). By spreading prints around, I found out this:

  • Reading is working. Code reads many times during execution.
  • Writing is also working. Data is written to p1.stdin.
  • At the end of numwrites, p1.stdin.close() is called.
  • When select() starts blocking, only to_read has something, p2.stdout. to_write is already empty.
  • os.read() call always returns something, so p2.stdout.close() is never called.

Conclusion from both tests: Closing the stdin of the first process on the pipeline (grep in the example) is not making it dump its buffered output to the next and die.

No way to make it work?

PS: I don't want to use a temporary file, I've already tested with files and I know it works. And I don't want to use windows.

nosklo
  • 217,122
  • 57
  • 293
  • 297
  • Closing stdin of `grep` **must** make it dump its output. If that doesn't happen, something is really, really broken. – Aaron Digulla Oct 23 '09 at 10:59
  • 2
    This question and your solution below are an absolute treasure trove. Thank you for saving me hours of work - this kind of information is what makes stackoverflow such a fantastic resource. – Andrew Mar 19 '11 at 19:16

11 Answers11

22

I found out how to do it.

It is not about threads, and not about select().

When I run the first process (grep), it creates two low-level file descriptors, one for each pipe. Lets call those a and b.

When I run the second process, b gets passed to cut sdtin. But there is a brain-dead default on Popen - close_fds=False.

The effect of that is that cut also inherits a. So grep can't die even if I close a, because stdin is still open on cut's process (cut ignores it).

The following code now runs perfectly.

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True SHOULD BE THE DEFAULT on unix systems. On windows it closes all fds, so it prevents piping.

EDIT:

PS: For people with a similar problem reading this answer: As pooryorick said in a comment, that also could block if data written to p1.stdin is bigger than the buffers. In that case you should chunk the data into smaller pieces, and use select.select() to know when to read/write. The code in the question should give a hint on how to implement that.

EDIT2: Found another solution, with more help from pooryorick - instead of using close_fds=True and close ALL fds, one could close the fds that belongs to the first process, when executing the second, and it will work. The closing must be done in the child so the preexec_fn function from Popen comes very handy to do just that. On executing p2 you can do:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)
nosklo
  • 217,122
  • 57
  • 293
  • 297
  • 3
    close_fds=True is the default in python 3.3, but not 2.7 – ggg Dec 20 '12 at 10:45
  • A nicer alternative to `select` is to put the producer in a thread and then drive the pipeline by reading _incrementally_ from the output end of the pipeline, similar to [this example](http://stackoverflow.com/a/14026178/33208). – Jed Dec 24 '12 at 22:47
  • @Jed: The example you link doesn't answer the problem because it doesn't pipe the output of one subprocess to the input of another, which is what triggers the problem in 1st place. Running a single process as you did in the example works fine. If you have example of running two or more processes and pipe the output of one to the input of another without blocking, please provide it. I tried to adapt your example and it didn't work. Also, I can't see it as "nicer", why running a thread to wait for IO is "nicer"? Won't play nice with other stuff i.e. signals or `fork()`, so they're best avoided! – nosklo Dec 27 '12 at 13:49
  • 1. You don't say how you modified it, but a very simple modification works great. You should be able to figure it out from the [verbose answer](http://stackoverflow.com/a/14061132/33208). 2. `select` is lower level and not portable (Windows). Threads/processes allow you to not tangle your logic up into the select loop. You can use gevent when OS threads become too heavy-weight, but they are a fine solution here. – Jed Dec 27 '12 at 22:17
  • @Jed 1.. as I said I tried multiple modifications, and None of them work. ALL attempts block. Also I **already figured** out that the problem has absolutely nothing to do with threads, but with the way the fds are kept open when you fork another process. So you can't solve it by using threads, it is not solvable this way, the source of the problem lies elsewhere. I **challenge** you to provide me a code that works. I don't see why using a less-than-optimal solution is a fine solution, since I can use the optimal solution directly and it would scale better for any load. – nosklo Jan 11 '13 at 19:49
  • @Jed the only "very simple" modification that works is the `close_fds=True` parameter I mention in this answer, and then if you use it, you don't need threads, everything works. – nosklo Jan 11 '13 at 19:51
  • @nosklo Your "solution" does not work for large input sizes because `write()` blocks once the operating system's buffers fill. (Try writing a megabyte instead of one line.) I [already provided](http://stackoverflow.com/a/14061132/33208) you with complete working code that operates in constant memory independent of input size, along with an explanation of why it works. Your insistence that `close_fds=True` is a viable solution is deeply misleading. There is already too much confusion and misinformation on this topic. – Jed Jan 11 '13 at 20:32
  • @Jed yeah I'm aware that it won't work for large input sizes, however, as I have explained multiple times, that is not the problem. The code in the question hangs for even if I write **a single byte** so I'm asking about **another, unrelated problem** to system buffers being filled. I know how to solve the system buffer problem already and that is not what I'm asking about. I've already determined that the problem is that the fd of the first process is still open in the second process and that's what is hanging. Closing that single fd when launching the second process solves the problem. – nosklo Jan 17 '13 at 20:18
7

Working with large files

Two principles need to be applied uniformly when working with large files in Python.

  1. Since any IO routine can block, we must keep each stage of the pipeline in a different thread or process. We use threads in this example, but subprocesses would let you avoid the GIL.
  2. We must use incremental reads and writes so that we don't wait for EOF before starting to make progress.

An alternative is to use nonblocking IO, though this is cumbersome in standard Python. See gevent for a lightweight threading library that implements the synchronous IO API using nonblocking primitives.

Example code

We'll construct a silly pipeline that is roughly

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

where each stage in braces {} is implemented in Python while the others use standard external programs. TL;DR: See this gist.

We start with the expected imports.

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading

Python stages of the pipeline

All but the last Python-implemented stage of the pipeline needs to go in a thread so that it's IO does not block the others. These could instead run in Python subprocesses if you wanted them to actually run in parallel (avoid the GIL).

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()

Each of these needs to be put in its own thread, which we'll do using this convenience function.

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t

Create the pipeline

Create the external stages using Popen and the Python stages using spawn. The argument bufsize=-1 says to use the system default buffering (usually 4 kiB). This is generally faster than the default (unbuffered) or line buffering, but you'll want line buffering if you want to visually monitor the output without lags.

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

Drive the pipeline

Assembled as above, all the buffers in the pipeline will fill up, but since nobody is reading from the end (grepz.stdout), they will all block. We could read the entire thing in one call to grepz.stdout.read(), but that would use a lot of memory for large files. Instead, we read incrementally.

for line in grepz.stdout:
    sys.stdout.write(line.lower())

The threads and processes clean up once they reach EOF. We can explicitly clean up using

for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

Python-2.6 and earlier

Internally, subprocess.Popen calls fork, configures the pipe file descriptors, and calls exec. The child process from fork has copies of all file descriptors in the parent process, and both copies will need to be closed before the corresponding reader will get EOF. This can be fixed by manually closing the pipes (either by close_fds=True or a suitable preexec_fn argument to subprocess.Popen) or by setting the FD_CLOEXEC flag to have exec automatically close the file descriptor. This flag is set automatically in Python-2.7 and later, see issue12786. We can get the Python-2.7 behavior in earlier versions of Python by calling

p._set_cloexec_flags(p.stdin)

before passing p.stdin as an argument to a subsequent subprocess.Popen.

Community
  • 1
  • 1
Jed
  • 1,651
  • 17
  • 26
  • I don't want the data to go through python for each step. I want the data to pass directly from one process to another. I tried to adapt your code to my problem, but it still hangs. Here is my attempt: http://bpaste.net/show/NKX5FLaHCskMpO7YnwuS/ can you help me fix it so that it doesn't hang using your method? -- Note that, to reproduce the problem, you have to use the same environment I'm using, that is python 2.6 in old ubuntu 9.04. In python 2.7 recent ubuntu it has been fixed already and my original code (the one in the question) **already works**. – nosklo Jan 17 '13 at 20:35
  • 1
    Note that in my example, `grepk` goes directly to `grepz`. I was showing how to extend it to an arbitrary pipeline. I added an explanation of `FD_CLOEXEC` and [issue12786](http://bugs.python.org/issue12786), which I now understand is what you originally tripped over. The buffering problem is still an issue with your code and advising people to use `select` is needlessly confusing. If you add `p1._set_cloexec_flag(p1.stdin)` between the two `Popen` calls in your example, it will work with all Python-2.x. – Jed Jan 18 '13 at 17:21
  • thanks, that's great, `_set_cloexec_flag()` private function is yet another solution to the problem. This answer however is still answering a different question about large files, which is not the issue I asked in first place. I'm writing about a dozen bytes so I don't need to worry about buffers in this program. If you're willing edit it and remove the "big files" stuff and leave only the answer to my problem (the last part of the answer) I'll upvote it. Otherwise I'll just edit my answer and add the information you provided. Thanks for the link to the issue, it has been very clarifying. – nosklo Jan 21 '13 at 19:23
  • @nosklo I don't care about your upvote. I wrote this answer because there was too much misleading information and the selected answer is fragile and not easily extensible. It would probably be better to make a new question and move this answer there. – Jed Jan 21 '13 at 21:03
  • well, I don't know what's wrong with the selected answer. Before your last edit, it was is the only answer that actually answers the question! The code the question/answer contains is example code to reproduce the problem, and to demonstrate the fix, not at all a complex example of how to deal with any files. I think it would be a good solution to make another question about big files or whatever and answer it there, and leave this question to be only about the blocking situation you get when the FD is not properly closed. – nosklo Jan 22 '13 at 20:50
3

There are three main tricks to making pipes work as expected

  1. Make sure each end of the pipe is used in a different thread/process (some of the examples near the top suffer from this problem).

  2. explicitly close the unused end of the pipe in each process

  3. deal with buffering by either disabling it (Python -u option), using pty's, or simply filling up the buffer with something that won't affect the data, ( maybe '\n', but whatever fits).

The examples in the Python "pipeline" module (I'm the author) fit your scenario exactly, and make the low-level steps fairly clear.

http://pypi.python.org/pypi/pipeline/

More recently, I used the subprocess module as part of a producer-processor-consumer-controller pattern:

http://www.darkarchive.org/w/Pub/PythonInteract

This example deals with buffered stdin without resorting to using a pty, and also illustrates which pipe ends should be closed where. I prefer processes to threading, but the principle is the same. Additionally, it illustrates synchronizing Queues to which feed the producer and collect output from the consumer, and how to shut them down cleanly (look out for the sentinels inserted into the queues). This pattern allows new input to be generated based on recent output, allowing for recursive discovery and processing.

Poor Yorick
  • 229
  • 1
  • 4
  • 4
  • You don't need threads. It would be ridiculous to require threading for a simple thing as running a pipe. The problem is already solved, for months, in my answer - it was the `close_fds=True` that was causing the problem. – nosklo Feb 01 '10 at 11:04
  • 1
    You do need either threads or processes. Subprocess.Popen just does them under the hood so you don't see them. You initially had so much trouble because you didn't understand the principles of communicating with another process via pipes, which is why I posted examples that succinctly illustrate the nitty-gritty details. Your statement that close_fds should be the default on Unix systems makes it clear that you still don't understand pipes very well, or at least haven't thought through the possible scenarios. – Poor Yorick Feb 06 '10 at 01:44
  • 1
    Also, for all but he most trivial tasks, "communicate" is not an adequate mechanism for sending data into a pipe -- especially if you are trying to consume data from the other end. For a more robust solution, you'll need something like the queues from my second example. Also, also, the only reason your "solution" example works is that you were saved by buffering that you probably aren't even aware of. If you wrote more data to p1.stdin, it would hang again. People who see your solution example are going to go away mislead about how to do this right. – Poor Yorick Feb 06 '10 at 01:44
  • My answer was just a simple example - If one needs to write more than what the buffers allow, then `select.select()` should be used to know exactly when it's possible to read or write without blocking, as I did in one of the attempts in the question. I still don't need threads, or processes, or queues for that. Just plain `select`, single process/thread. About `close_fds=True` - it should be the default because it leads to WTFs. `Popen` implementation, as you said, hides the details, so it shouldn't pass the `fd` to the child process **by default**. That should be done explicity if one wants. – nosklo Feb 09 '10 at 03:41
  • 1
    Now it is clear that you also do not understand how to use select.select. See my explanation above. You're just down-voting my responses to protect your own. – Poor Yorick Feb 09 '10 at 16:43
  • I'm downvoting your responses, because they are **wrong**, as in, they don't provide a solution to the question. You say to use threads but that didn't work out when I tried - it blocks just the same without `close_fds=True`. Perhaps you want to provide code with threads that can make a pipe work using subprocess.Popen? Because that was the original question. Also, saying that people don't understand stuff is meaningless and annoying since you don't explain anything different, so you could just be quiet about it since you don't want to help. – nosklo Feb 10 '10 at 11:44
  • I didn't say to use threads. I prefer processes. At any rate, in just a minute, I'll post a version of your "solution" which does not use close_fds=True – Poor Yorick Feb 10 '10 at 20:31
  • I've addressed the solution in the comments section. In the future, please refrain from posting answers with content that should be a comment (because it doesn't answer the question). – nosklo Feb 11 '10 at 10:26
3

Nosklo's offered solution will quickly break if too much data is written to the receiving end of the pipe:


from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

If this script doesn't hang on your machine, just increase "20000" to something that exceeds the size of your operating system's pipe buffers.

This is because the operating system is buffering the input to "grep", but once that buffer is full, the p1.stdin.write call will block until something reads from p2.stdout. In toy scenarios, you can get way with writing to/reading from a pipe in the same process, but in normal usage, it is necessary to write from one thread/process and read from a separate thread/process. This is true for subprocess.popen, os.pipe, os.popen*, etc.

Another twist is that sometimes you want to keep feeding the pipe with items generated from earlier output of the same pipe. The solution is to make both the pipe feeder and the pipe reader asynchronous to the man program, and implement two queues: one between the main program and the pipe feeder and one between the main program and the pipe reader. PythonInteract is an example of that.

Subprocess is a nice convenience model, but because it hides the details of the os.popen and os.fork calls it does under the hood, it can sometimes be more difficult to deal with than the lower-level calls it utilizes. For this reason, subprocess is not a good way to learn about how inter-process pipes really work.

Poor Yorick
  • 229
  • 1
  • 4
  • 4
  • You can do all that from the same process/thread by using `select.select()` on a pipe. You really **don't** need threads, processes or queues. My answer didn't cover it for simplicity, but there's an example of how to do that, in the question. About the last paragraph - not everybody wants to learn about how inter-process pipes really work. They just want them to do their job and be out of the way. That's why higher level constructs are created like `subprocess.Popen`. They should work for the majority of common-use cases without require knowledge from the user - that's the **whole point**. – nosklo Feb 09 '10 at 03:33
  • Also, even when using threads, the script hangs, if you don't have `close_fds=True`. – nosklo Feb 09 '10 at 03:37
  • 1
    sorry, but wrong again. all you have to do to make your select.select example hang is to overflow the buffer in your "write" call: p1.stdin.write('hello world!\n' * 5000); p1.stdin.flush(). I challenge you to either post an example of using select.select to handle arbitrary input and pipe buffering from a single thread, or to stop spreading disinformation and vote up my answers since they correct your mistakes. Are you aware that every time you call Popen, you are creating one or more separate threads/processes. Your toy answers don't work in the real world and lead to wtf. – Poor Yorick Feb 09 '10 at 16:35
  • http://paste.pocoo.org/show/176123/ -> example that doesn't hang. It handles arbitrary input size, by writing it in chunks, and only when `select.select` says you can write, so it *never* blocks. In the example, **NO THREADS ARE CREATED, EVER** not even inside `Popen`. Of course a new process is created for each `Popen`, because, as I said earlier, **that's the whole point of `Popen`**. Now that your challenge is fullfilled, I now challenge you into providing a `subprocess.Popen` PIPE solution using threads, that doesn't block and doesn't use `close_fds=True` (which is the real solution here). – nosklo Feb 10 '10 at 12:16
  • The code you posted is fatally flawed. See the explanation in my answer to the posted code. Note that I've already fulfilled your challenge using os.fork rather than subprocess.Popen: http://pypi.python.org/pypi/pipeline/0.1. The same principles apply to subprocess.Popen – Poor Yorick Feb 10 '10 at 18:07
  • I fixed my code, the fixed code is in the other answer. I don't care if you fixed it on other library, the question is very specific about `subprocess.Popen` so if you don't know the answer please shut up. I will normally downvote all answers that don't answer the question in any way. – nosklo Feb 11 '10 at 10:19
2

You must do this in several threads. Otherwise, you'll end up in a situation where you can't send data: child p1 won't read your input since p2 doesn't read p1's output because you don't read p2's output.

So you need a background thread that reads what p2 writes out. That will allow p2 to continue after writing some data to the pipe, so it can read the next line of input from p1 which again allows p1 to process the data which you send to it.

Alternatively, you can send the data to p1 with a background thread and read the output from p2 in the main thread. But either side must be a thread.

Aaron Digulla
  • 321,842
  • 108
  • 597
  • 820
  • Thanks, but that doesn't work. I've updated my question showing what I've tried. – nosklo Oct 20 '09 at 16:01
  • 1
    You can't use communicate() in this case. You must read and write the individual pipes. – Aaron Digulla Oct 21 '09 at 09:21
  • Aaron: I tried with .read() on the thread too, but it also blocks – nosklo Oct 22 '09 at 16:28
  • 1
    @nosklo `.read()` is a blocking call that tries to read all the way to `EOF` (when called without arguments). You need to read incrementally, as in `for line in output: ...`. – Jed Dec 24 '12 at 22:49
  • @Jed: I tried it too, but it still blocks. If you have code that works, please provide it in an answer. – nosklo Dec 27 '12 at 13:41
  • 1
    @nosklo Here's my detailed answer that works with large files. http://stackoverflow.com/a/14061132/33208 – Jed Dec 27 '12 at 20:57
  • @Jed Thanks, but I don't have problem with big files. Adapting your solution makes it hang even with a single byte. Check the comment on your answer for more details. – nosklo Jan 17 '13 at 21:06
2

Responding to nosklo's assertion (see other comments to this question) that it can't be done without close_fds=True:

close_fds=True is only necessary if you've left other file descriptors open. When opening multiple child processes, it's always good to keep track of open files that might get inherited, and to explicitly close any that aren't needed:

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds defaults to False because subprocess prefers to trust the calling program to know what it's doing with open file descriptors, and just provide the caller with an easy option to close them all if that's what it wants to do.

But the real issue is that pipe buffers will bite you for all but toy examples. As I have said in my other answers to this question, the rule of thumb is to not have your reader and your writer open in the same process/thread. Anyone who wants to use the subprocess module for two-way communication would be well-served to study os.pipe and os.fork, first. They're actually not that hard to use if you have a good example to look at.

Poor Yorick
  • 229
  • 1
  • 4
  • 4
  • Well, that's not really a pipe, since you're closing one side before starting the other process. Even then, if you change your code to write more data (like in `p1.stdin.write('Hello World\n' * 100000)`) it will block. You said you need the reader and writer in separate processes but you **still haven't provided any code** that does it successfully with `subprocess.Popen`. That was the question. If you know an answer, then please answer. If you don't, please stop saying nonsense as "community wiki". – nosklo Feb 11 '10 at 10:16
  • 1
    Which illustrates that you also don't understand what pipes are. And this code does *exactly* what your example does, so if it's not a pipe, neither is yours. The real answer to your question is that the subprocess module is inadequate when you want two-way communication with another process, particularly if future input to the pipe depends on observed output from the pipe. The beauty of shell pipes is that they don't hog memory by buffering output like subprocess does. You don't explicitly instantiate multiple processes with subprocess.Popen, because it does that under the hood with os.fork. – Poor Yorick Feb 11 '10 at 13:27
  • `Popen` forks, yes, but it uses `os.execvpe`, so in the end, a single new process is created for each `Popen`, as it should be. – nosklo Feb 12 '10 at 10:25
1

I think you may be examining the wrong problem. Certainly as Aaron says if you try to be both a producer to the beginning of a pipeline, and a consumer of the end of the pipeline, it is easy to get into a deadlock situation. This is the problem that communicate() solves.

communicate() isn't exactly correct for you since stdin and stdout are on different subprocess objects; but if you take a look at the implementation in subprocess.py you'll see that it does exactly what Aaron suggested.

Once you see that communicate both reads and writes, you'll see that in your second try communicate() competes with p2 for the output of p1:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
# ...
p1.communicate('data\n')       # reads from p1.stdout, as does p2

I am running on win32, which definitely has different i/o and buffering characteristics, but this works for me:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.stdin.write('hello world\n' * 100000)
p1.stdin.close()
t.join()

I tuned the input size to produce a deadlock when using a naive unthreaded p2.read()

You might also try buffering into a file, eg

fd, _ = tempfile.mkstemp()
os.write(fd, 'hello world\r\n' * 100000)
os.lseek(fd, 0, os.SEEK_SET)
p1 = Popen(["grep", "-v", "not"], stdin=fd, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
print p2.stdout.read()

That also works for me without deadlocks.

Paul Du Bois
  • 2,097
  • 1
  • 20
  • 31
  • Thanks!!! I checked `subprocess.py` and tried to do as you said, but it still blocks. I've updated my question. HELP!!! – nosklo Oct 23 '09 at 02:07
1

In one of the comments above, I challenged nosklo to either post some code to back up his assertions about select.select or to upvote my responses he had previously down-voted. He responded with the following code:

from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 100000 * 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = p2.stdout.read(1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

One problem with this script is that it second-guesses the size/nature of the system pipe buffers. The script would experience fewer failures if it could remove magic numbers like 1024.

The big problem is that this script code only works consistently with the right combination of data input and external programs. grep and cut both work with lines, and so their internal buffers behave a bit differently. If we use a more generic command like "cat", and write smaller bits of data into the pipe, the fatal race condition will pop up more often:

from subprocess import Popen, PIPE
import select
import time

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    time.sleep(1)
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        print 'I am reading now!'
        data = p2.stdout.read(1024)
        if not data:
            p1.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        print 'I am writing now!'
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            print 'closing file'
            p1.stdin.close()
            to_write = []

print b

In this case, two different results will manifest:

write, write, close file, read -> success
write, read -> hang

So again, I challenge nosklo to either post code showing the use of select.select to handle arbitrary input and pipe buffering from a single thread, or to upvote my responses.

Bottom line: don't try to manipulate both ends of a pipe from a single thread. It's just not worth it. See pipeline for a nice low-level example of how to do this correctly.

Poor Yorick
  • 229
  • 1
  • 4
  • 4
  • 1
    Your answers can't be upvoted, simply because they don't answer the question. I'm getting tired because you don't provide any `Popen` code that answers the question, and keep bragging nonsense about "you have to use threads". I've fixed my code bugs, http://paste.pocoo.org/show/176561/ by switching some statements around, and then improved it for easy testing. Now it tests **all** combinations of a lot of buffer sizes to read and write, and data sizes, repeating each test 50 times, and **all of them work fine**, again without using threads. So no "magic number". I'm still waiting your answer. – nosklo Feb 11 '10 at 10:33
  • That's a better example of select.select. It still contains an unnecessary close() function -- learning how to use os.fork and os.pipe is the best way to get a handle on the issues. The magic number is still there -- you've just parameterized it. This example cuts your effective buffer from (usually) 64k to 1k or even less, so performance will suffer. Next step: how would you feed the entire output back through the same pipe exactly one time? – Poor Yorick Feb 11 '10 at 15:16
  • The point about Popen is that it uses os.fork and os.exec, so you already *are* working with threads/processes -- it's just a matter of knowing how to manipulate the pipes. – Poor Yorick Feb 11 '10 at 15:16
  • Finally, select.select is more appropriate for multi-consumer/multi-producer scenarios. It's overly-complex for this scenario. For a much simpler solution (which does not use close_fds), see my os.fork answer to this question. – Poor Yorick Feb 11 '10 at 15:36
0

What about using a SpooledTemporaryFile ? This bypasses (but perhaps doesn't solve) the issue:

http://docs.python.org/library/tempfile.html#tempfile.SpooledTemporaryFile

You can write to it like a file, but it's actually a memory block.

Or am I totally misunderstanding...

Adam Nelson
  • 7,932
  • 11
  • 44
  • 64
  • `SpooledTemporaryFile` rolls over to a real file when you call its `fileno()` method (which Popen needs to do), so it is the same as using a file - defeats the purpose of using a pipeline in first place. :( – nosklo Oct 23 '09 at 10:31
-1

Here's an example of using Popen together with os.fork to accomplish the same thing. Instead of using close_fds it just closes the pipes at the right places. Much simpler than trying to use select.select, and takes full advantage of system pipe buffers.

from subprocess import Popen, PIPE
import os
import sys

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)

pid = os.fork()

if pid: #parent
    p1.stdin.close()
    p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE)
    data = p2.stdout.read()
    sys.stdout.write(data)
    p2.stdout.close()

else: #child
    data_to_write = 'hello world\n' * 100000
    p1.stdin.write(data_to_write)
    p1.stdin.close()
Poor Yorick
  • 229
  • 1
  • 4
  • 4
  • Hm, that makes sense. However it needlessy starts 3 new processes, not 2. So I would refrain from using this solution. – nosklo Feb 12 '10 at 10:23
  • 1
    Thanks for that answer, with help from it I was able to find another solution that creates only a single new process per `Popen`, and doesn't need `close_fds=True` - using `preexec_fn` to close fds in the forked child before `execvpe`. I've edited my answer. – nosklo Feb 13 '10 at 15:15
-1

It's much simpler than you think!

import sys
from subprocess import Popen, PIPE

# Pipe the command here. It will read from stdin.
#   So cat a file, to stdin, like (cat myfile | ./this.py),
#     or type on terminal and hit control+d when done, etc
#   No need to handle this yourself, that's why we have shell's!
p = Popen("grep -v not | cut -c 1-10", shell=True, stdout=PIPE)

nextData = None
while True:
    nextData = p.stdout.read()
    if nextData in (b'', ''):
        break
    sys.stdout.write ( nextData.decode('utf-8') )


p.wait()

This code is written for python 3.6, and works with python 2.7.

Use it like:

cat README.md  | python ./example.py

or

python example.py < README.md

To pipe the contents of "README.md" to this program.

But.. at this point, why not just use "cat" directly, and pipe the output like you want? like:

cat filename | grep -v not | cut -c 1-10

typed into the console will do the job as well. I personally would only use the code option if I was further processing the output, otherwise a shell script would be easier to maintain and be retained.

You just, use the shell to do the piping for you. In one, out the other. That's what she'll are GREAT at doing, managing processes, and managing single-width chains of input and output. Some would call it a shell's best non-interactive feature..