3

I have three commands that would otherwise be easily chained together on the command-line like so:

$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput

In other words, the firstCommand processes foo from standard input and pipes the result to secondCommand, which in turn processes that input and pipes its output to thirdCommand, which does processing and redirects its output to the file finalOutput.

I have been trying to recapitulate this in a Python script, using threading. I'd like to use Python in order to manipulate the output from firstCommand before passing it to secondCommand, and again between secondCommand and thirdCommand.

Here's an excerpt of code that does not seem to work:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.communicate()
second_process.communicate()
third_process.communicate()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

When I run this, the script hangs:

$ echo foo | ./myConversionScript.py
** hangs here... **

If I hit Ctrl-C to terminate the script, the code is stuck on the line third_thread.join():

  C-c C-c
Traceback (most recent call last):
  File "./myConversionScript.py", line 786, in <module>
    sys.exit(main(*sys.argv))
  File "./myConversionScript.py", line 556, in main
    third_thread.join()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
    self.__block.wait()
  File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
    waiter.acquire()
KeyboardInterrupt

If I don't use a third_process and third_thread, instead only passing data from the output of the first thread to the input of the second thread, there is no hang.

Something about the third thread seems to cause things to break, but I don't know why.

I thought the point of communicate() is that it will handle I/O for the three processes, so I'm not sure why there is an I/O hang.

How do I get three or more commands/processes working together, where one thread consumes the output of another thread/process?

UPDATE

Okay, I made some changes that seem to help, based on some comments here and on other sites. The processes are made to wait() for completion, and within the thread methods, I close() the pipes once the thread has processed all the data that it can. My concern is that memory usage will be very high for large datasets, but at least things are working:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))

first_thread.start()
second_thread.start()
third_thread.start()

first_thread.join()
second_thread.join()
third_thread.join()

first_process.wait()
second_process.wait()
third_process.wait()

# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
    chunk = from_stream.read(1024)
    while chunk:
        to_stream.write(chunk)
        to_stream.flush()
        chunk = from_stream.read(1024)

def consumeOutputFromFirstCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = some_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()

def consumeOutputFromSecondCommand(from_stream, to_stream):
    while True:
        unprocessed_line = from_stream.readline()
        if not unprocessed_line:
            from_stream.close()
            to_stream.close()
            break
        processed_line = a_different_python_function_that_processes_line(unprocessed_line)
        to_stream.write(processed_line)
        to_stream.flush()
Alex Reynolds
  • 95,983
  • 54
  • 240
  • 345
  • related: [How do I use subprocess.Popen to connect multiple processes by pipes?](http://stackoverflow.com/q/295459/4279) – jfs Jan 08 '14 at 11:39
  • more similar examples: [Using POpen to send a variable to Stdin and to send Stdout to a variable](http://stackoverflow.com/q/20789427/4279) – jfs Jan 08 '14 at 11:43
  • more: [Python subprocess module, how do I give input to the first of series of piped commands?](http://stackoverflow.com/q/5080402/4279) – jfs Jan 08 '14 at 11:48
  • Thanks, J.F. Sebastian. However, the solution in your first comment does not work — the binaries act as if they do not receive any arguments. The second comment reflects the use of two binaries, not three as in my question. As I stated, I can use two binaries with the threading arrangement I describe above. I'm still working through to see how the third comment relates to my question. – Alex Reynolds Jan 08 '14 at 19:05
  • are you telling me that `subprocess.check_call('echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput', shell=True)` doesn't work? – jfs Jan 08 '14 at 19:06
  • That particular invocation might or might not work, but my use of `subprocess.Popen(...)` to gain control over `stdout` and `stdin`, as described above, does not work. – Alex Reynolds Jan 08 '14 at 19:11
  • update your question and provide the actual bash command that you are trying to emulate in Python. Use plain English if you can't provide the command. – jfs Jan 08 '14 at 19:15
  • 1
    I thought my question was clear, but I'll try to clarify: I want to run that command, but I need to capture chunks of output from one command, process them, and send them to the next command. So it would be something like: `echo foo | firstCommand - | somePythonRoutine - | secondCommand - | anotherPythonRoutine - | thirdCommand - > finalOutput`. The idea is to do all the I/O work entirely within one Python script instead of through several smaller scripts, and without the use of temporary files to store intermediate results by using `stdin`/`stdout` to handle data. Hopefully this helps. – Alex Reynolds Jan 08 '14 at 19:19
  • *"I have been trying to recapitulate this in a Python script, using threading. I'd like to use Python in order to manipulate the output from firstCommand before passing it to secondCommand, and again between secondCommand and thirdCommand."* – Alex Reynolds Jan 08 '14 at 19:20
  • ok. Your intent is clear now. Close pipes explicitly (inside threads) and use `.wait()` instead of `.communicate()`. As I understand, the code hangs before any of the `.communicate()` calls are made so they can't affect the processes even if they could. – jfs Jan 08 '14 at 20:49

2 Answers2

2

To emulate:

echo foo |
firstCommand - | somePythonRoutine - |
secondCommand - | anotherPythonRoutine - |
thirdCommand - > finalOutput

your current approach with threads works:

from subprocess import Popen, PIPE

first = Popen(["firstCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1)
second = Popen(["secondCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1)
bind(first.stdout, second.stdin, somePythonRoutine)
with open("finalOutput", "wb") as file:
    third = Popen(["thirdCommand", "-"], stdin=PIPE, stdout=file, bufsize=1)
bind(second.stdout, third.stdin, anotherPythonRoutine)

# provide input for the pipeline
first.stdin.write(b"foo")
first.stdin.close()

# wait for it to complete
pipestatus = [p.wait() for p in [first, second, third]]

where each bind() starts a new thread:

from threading import Thread

def bind(input_pipe, output_pipe, line_filter):
    def f():
        try:
            for line in iter(input_pipe.readline, b''):
                line = line_filter(line)
                if line:
                    output_pipe.write(line) # no flush unless newline present
        finally:
            try:
                output_pipe.close()
            finally:
                input_pipe.close()
    t = Thread(target=f)
    t.daemon = True # die if the program exits
    t.start()

and somePythonRoutine, anotherPythonRoutine accept a single line and return it (possibly modified).

jfs
  • 399,953
  • 195
  • 994
  • 1,670
1

The point of communicate() is that it returns the output of the process. This collides with your pipe setup.

You should only call it once on the third process; all the other ones are connected via pipes and know how to communicate with each other - no other / manual intervention is necessary.

Aaron Digulla
  • 321,842
  • 108
  • 597
  • 820