3

I have many shell commands that need to be executed in my python script. I know that I shouldn't use shell=true as mentioned here and that I can use the std outputs and inputs in case when I have pipes in the command as mentioned here.

But the problem is that my shell commands are complex and full of pipes, so I'd like to make a generic method to be used by my script.

I made a small test below, but is hanging after printing the result (I simplified just to put here). Can somebody please let me know:

  1. Why is hanging.
  2. If there's a better method of doing this.

Thanks.

PS: This is just a small portion of a big python project and there are business reasons why I'm trying to do this. Thanks.

#!/usr/bin/env python3
import subprocess as sub
from subprocess import Popen, PIPE
import shlex

def exec_cmd(cmd,p=None,isFirstLoop=True):
   if not isFirstLoop and not p:
       print("Error, p is null")
       exit()
   if "|" in cmd:
       cmds = cmd.split("|")
       while "|" in cmd:
           # separates what is before and what is after the first pipe
           now_cmd = cmd.split('|',1)[0].strip()
           next_cmd = cmd.split('|',1)[-1].strip()
           try:
               if isFirstLoop:
                   p1 = sub.Popen(shlex.split(now_cmd), stdout=PIPE)
                   exec_cmd(next_cmd,p1,False)
               else:
                   p2 = sub.Popen(shlex.split(now_cmd),stdin=p.stdout, stdout=PIPE)
                   exec_cmd(next_cmd,p2,False)
           except Exception as e:
               print("Error executing command '{0}'.\nOutput:\n:{1}".format(cmd,str(e)))
               exit()
           # Adjust cmd to execute the next part
           cmd = next_cmd
   else:
       proc = sub.Popen(shlex.split(cmd),stdin=p.stdout, stdout=PIPE, universal_newlines=True)
       (out,err) = proc.communicate()
       if err:
           print(str(err).strip())
       else:
           print(out)



exec_cmd("ls -ltrh | awk '{print $9}' | wc -l ")
Arthur Accioly
  • 801
  • 9
  • 26
  • You might have some luck posting this on the code review stack exchange site. As I see it, there are some potential problems with the way that commands are parsed, very similar problems to the ones with `shell=True`… this code is not much better. Second, if you create a pipe using `stdout=PIPE` you can then pass `stdin=proc.stdout` to use the same pipe, which is a better way to connect programs. But it requires a bit of thinking. – Dietrich Epp Aug 08 '18 at 23:31
  • Nice, let me try both suggestions. Thanks. – Arthur Accioly Aug 09 '18 at 13:19
  • @DietrichEpp Code review does not accept neither code that is not working, nor code that is simplified for the purpose of demonstrating a specific behaviour. This question has a clear problem (code hangs) that can be addressed on SO. – 301_Moved_Permanently Aug 09 '18 at 13:41
  • 1
    Here's a simple way to do it: https://gist.github.com/depp/c798381d6810657f528fe8d0d8013729 – Dietrich Epp Aug 09 '18 at 14:13
  • @DietrichEpp, I believe that your answer should also be added to the answers below. Thanks. – Arthur Accioly Aug 13 '18 at 21:29
  • @ArthurAccioly: I didn't want to post an answer with code I didn't stand behind. It was a comment because there were some edge cases yet to consider. – Dietrich Epp Aug 14 '18 at 14:30

2 Answers2

3

Instead of using a shell string and trying to parse it with your own means, I’d ask the user to provide the commands as separate entities themselves. This avoid the obvious trap of detecting a | that is part of a command and not used as a shell pipe. That you ask them to provide commands as a list of strings or a single string that you will shlex.split afterwards is up to the interface that you want to expose. I’d choose the first one for its simplicity in the following example.

Once you have the individual commands, a simple for loop is enough to pipe outputs of the previous commands to inputs of the next ones, as you have found yourself:

def pipe_subprocesses(*commands):
    if not commands:
        return

    next_input = None
    for command in commands:
        p = subprocess.Popen(command, stdin=next_input, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        next_input = p.stdout

    out, err = p.communicate()
    if err:
        print(err.decode().strip())
    else:
        print(out.decode())

Usage being:

>>> pipe_subprocesses(['ls', '-lhtr'], ['awk', '{print $9}'], ['wc', '-l'])
25

Now this is a quick and dirty way to get it setup and have seemingly work as you want it. But there are at least two issues with this code:

  1. You leak zombies process/opened process handles because no process' exit code but the last one is collected; and the OS is keeping resources opened for you to do so;
  2. You can't access the informations of a process that would fail midway through.

To avoid that, you need to maintain a list of opened process and explicitly wait for each of them. And because I don't know your exact use case, I'll just return the first process that failed (if any) or the last process (if not) so you can act accordingly:

def pipe_subprocesses(*commands):
    if not commands:
        return

    processes = []
    next_input = None
    for command in commands:
        if isinstance(command, str):
            command = shlex.split(command)
        p = subprocess.Popen(command, stdin=next_input, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        next_input = p.stdout
        processes.append(p)

    for p in processes:
        p.wait()

    for p in processes:
        if p.returncode != 0:
            return p
    return p  # return the last process in case everything went well

I also thrown in some shlex as an example so you can mix raw strings and already parsed lists:

>>> pipe_subprocesses('ls -lhtr', ['awk', '{print $9}'], 'wc -l')
25
301_Moved_Permanently
  • 4,007
  • 14
  • 28
  • 1
    This will lose errors (return codes of all processes are ignored), potentially leak processes (does not wait for all children), and the input to the first process should not be a pipe if you are not going to write anything to it. – Dietrich Epp Aug 09 '18 at 14:15
  • The code I posted is "here is how you do this" with no real explanation or context, I don't consider it a quality answer for Stack Overflow. There are unfortunately some subtle bits in how you do this and I don't have the time for a writeup, so I posted a gist instead. – Dietrich Epp Aug 09 '18 at 14:27
  • Yes, I mentioned that the split was incorrect in the comments which is why the function is split into two. However preallocating the array `procs` seems a bit absurd to me, because you are making the code more complicated and difficult to understand for a performance benefit that might not even be measurable--you have to fork/exec each time through the loop, and that's orders of magnitude more expensive than `append`. – Dietrich Epp Aug 09 '18 at 14:58
3

This unfortunately has a few edge cases in it that the shell takes care of for you, or alternatively, that the shell completely ignores for you. Some concerns:

  • The function should always wait() for every process to finish, or else you will get what are called zombie processes.

  • The commands should be connected to each other using real pipes, that way the entire output doesn't need to be read into memory at once. This is the normal way pipes work.

  • The read end of every pipe should be closed in the parent process, so children can properly SIGPIPE when the next process closes its input. Without this, the parent process can keep the pipe open and the child does not know to exit, and it may run forever.

  • Errors in child processes should be raised as exceptions, except SIGPIPE. It is left as an exercise to the reader to raise exceptions for SIGPIPE on the final process because SIGPIPE is not expected there, but ignoring it is not harmful.

Note that subprocess.DEVNULL does not exist prior to Python 3.3. I know there are some of you out there still living with 2.x, you will have to open a file for /dev/null manually or just decide that the first process in the pipeline gets to share stdin with the parent process.

Here is the code:

import signal
import subprocess

def run_pipe(*cmds):
    """Run a pipe that chains several commands together."""
    pipe = subprocess.DEVNULL
    procs = []
    try:
        for cmd in cmds:
            proc = subprocess.Popen(cmd, stdin=pipe,
                                    stdout=subprocess.PIPE)
            procs.append(proc)
            if pipe is not subprocess.DEVNULL:
                pipe.close()
            pipe = proc.stdout
        stdout, _ = proc.communicate()
    finally:
        # Must call wait() on every process, otherwise you get
        # zombies.
        for proc in procs:
            proc.wait()
    # Fail if any command in the pipe failed, except due to SIGPIPE
    # which is expected.
    for proc in procs:
        if (proc.returncode
            and proc.returncode != -signal.SIGPIPE):
            raise subprocess.CalledProcessError(
                proc.returncode, proc.args)
    return stdout

Here we can see it in action. You can see that the pipeline correctly terminates with yes (which runs until SIGPIPE) and correctly fails with false (which always fails).

In [1]: run_pipe(["yes"], ["head", "-n", "1"])
Out[1]: b'y\n'

In [2]: run_pipe(["false"], ["true"])
---------------------------------------------------------------------------
CalledProcessError                        Traceback (most recent call last)
<ipython-input-2-db97c6876cd7> in <module>()
----> 1 run_pipe(["false"], ["true"])

~/test.py in run_pipe(*cmds)
     22     for proc in procs:
     23         if proc.returncode and proc.returncode != -signal.SIGPIPE:
---> 24             raise subprocess.CalledProcessError(proc.returncode, proc.args)
     25     return stdout

CalledProcessError: Command '['false']' returned non-zero exit status 1
Dietrich Epp
  • 205,541
  • 37
  • 345
  • 415