32

I need to run an interactive Bash instance in a separated process in Python with it's own dedicated TTY (I can't use pexpect). I used this code snippet I commonly see used in similar programs:

master, slave = pty.openpty()

p = subprocess.Popen(["/bin/bash", "-i"], stdin=slave, stdout=slave, stderr=slave)

os.close(slave)

x = os.read(master, 1026)

print x

subprocess.Popen.kill(p)
os.close(master)

But when I run it I get the following output:

$ ./pty_try.py
bash: cannot set terminal process group (10790): Inappropriate ioctl for device
bash: no job control in this shell

Strace of the run shows some errors:

...
readlink("/usr/bin/python2.7", 0x7ffc8db02510, 4096) = -1 EINVAL (Invalid argument)
...
ioctl(3, SNDCTL_TMR_TIMEBASE or SNDRV_TIMER_IOCTL_NEXT_DEVICE or TCGETS, 0x7ffc8db03590) = -1 ENOTTY (Inappropriate ioctl for device)
...
readlink("./pty_try.py", 0x7ffc8db00610, 4096) = -1 EINVAL (Invalid argument)

The code snippet seems pretty straightforward, is Bash not getting something it needs? what could be the problem here?

Dima Tisnek
  • 11,241
  • 4
  • 68
  • 120
TKKS
  • 513
  • 1
  • 4
  • 9
  • 2
    That's quite normal — you got an **interactive shell** without **job control**. – Dima Tisnek Jan 09 '17 at 09:23
  • 2
    If you want job control too, you need your shell to become a process leader — that is start new "session", it's achieved with `start_new_session=True` keyword argument to `Popen` (since Python 3.2). If you need more control, use `preexec_fn=...` – Dima Tisnek Jan 09 '17 at 09:28
  • Ok, that sound reasonable. I understand that the `start_new_session=True` is only relevant to >3.2. Is there an equivalent in 2.7? Sorry, probably should have mentioned the python version in the question. – TKKS Jan 09 '17 at 10:28
  • 2
    You can do that by hand by calling `setsid()` in `preexec_fn` via `ctypes` – Dima Tisnek Jan 09 '17 at 10:38
  • I think this question is about same fundamentals as http://stackoverflow.com/questions/23826695/handling-keyboard-interrupt-when-using-subproccess/23839524#23839524 http://stackoverflow.com/questions/33119213/run-program-in-another-process-and-receive-pid-in-python/33120039#33120039 http://stackoverflow.com/questions/37737649/how-to-destroy-an-exe-filenot-converted-from-py-by-run-as-the-same-script/37776347#37776347 http://stackoverflow.com/questions/13243807/popen-waiting-for-child-process-even-when-the-immediate-child-has-terminated/13256908#13256908 it could be considered a duplicate. – Dima Tisnek Jan 10 '17 at 10:26
  • I don't think any of them is really about using the pseudo-terminal with Popen like this. I don't think I could have solved this issue with any of these other questions. I will publish my solution code. – TKKS Jan 11 '17 at 14:04
  • How to do this on Windows, using the cmd shell instead of bash? – K.Mulier Feb 23 '21 at 21:16

3 Answers3

24

This is a solution to run an interactive command in subprocess. It uses pseudo-terminal to make stdout non-blocking(also some command needs a tty device, eg. bash). it uses select to handle input and ouput to the subprocess.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()


try:
    # use os.setsid() make it run in a new process group, or bash job control will not be enabled
    p = Popen(command,
              preexec_fn=os.setsid,
              stdin=slave_fd,
              stdout=slave_fd,
              stderr=slave_fd,
              universal_newlines=True)

    while p.poll() is None:
        r, w, e = select.select([sys.stdin, master_fd], [], [])
        if sys.stdin in r:
            d = os.read(sys.stdin.fileno(), 10240)
            os.write(master_fd, d)
        elif master_fd in r:
            o = os.read(master_fd, 10240)
            if o:
                os.write(sys.stdout.fileno(), o)
finally:
    # restore tty settings back
    termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
JnBrymn
  • 24,245
  • 28
  • 105
  • 147
Paco
  • 411
  • 3
  • 9
  • From docs, `pty.spawn` seem to do similar stuff but simpler interface. https://docs.python.org/3/library/pty.html#example Are they doing the same? – balki Mar 26 '19 at 15:26
  • @balki [pty.spawn](https://docs.python.org/3/library/pty.html#pty.spawn) is blocking: "It is expected that the process spawned behind the pty will eventually terminate, and when it does spawn will return." – moi Mar 18 '20 at 20:40
  • @balki I've looked at the source code on https://github.com/python/cpython/blob/master/Lib/pty.py#L151. I think `pty.spawn` is doing basically the same thing as this code snippet does. So yes, use `pty.spawn` as it would make application code simpler. – Paco Jul 19 '20 at 05:45
  • 1
    if the process exits without output this hangs waiting for input (setting a timeout on `select.select` or setting it to nonblocking "fixes" this). this also needs a little work for window sizing (SIGWINCH, etc.) – anthony sottile Sep 05 '20 at 17:47
  • 1
    Is it possible to separate stdout/stderr ? – Noortheen Raja Feb 27 '22 at 20:05
  • in my case the subprocess required an interactive tty ... so when i tried doing bash `app.py &` it error'd out. then i tried passing `stdin=subprocess.DEVNULL` and that didn't work. then i tried passing an open file descriptor and that didn't work. finally i opened the `pty.openpty()` and passed in to the subprocess `stdin=slave_fd` and it worked. – Trevor Boyd Smith Jun 02 '22 at 15:03
8

This is the solution that worked for me at the end (as suggested by qarma) :

libc = ctypes.CDLL('libc.so.6')

master, slave = pty.openpty()
p = subprocess.Popen(["/bin/bash", "-i"], preexec_fn=libc.setsid, stdin=slave, stdout=slave, stderr=slave)
os.close(slave)

... do stuff here ...

x = os.read(master, 1026)
print x
TKKS
  • 513
  • 1
  • 4
  • 9
1

Here is a full object oriented solution to do interactive shell commands with TTYs using threads and queues for stdout and stderr IO handling. This took me a while to build from multiple locations but it works perfectly so far on Unix/Linux systems and also as part of a Juniper op script. Thought I would post this here to save others time in trying to build something like this.

import pty
import re
import select
import threading
from datetime import datetime, timedelta
import os
import logging
import subprocess
import time
from queue import Queue, Empty

lib_logger = logging.getLogger("lib")

# Handler function to be run as a thread for pulling pty channels from an interactive shell
def _pty_handler(pty_master, logger, queue, stop):
    poller = select.poll()
    poller.register(pty_master, select.POLLIN)
    while True:
        # Stop handler if flagged
        if stop():
            logger.debug("Disabling pty handler for interactive shell")
            break

        fd_event = poller.poll(100)
        for descriptor, event in fd_event:
            # Read data from pipe and send to queue if there is data to read
            if event == select.POLLIN:
                data = os.read(descriptor, 1).decode("utf-8")
                if not data:
                    break
                # logger.debug("Reading in to handler queue: " + data)
                queue.put(data)
            # Exit handler if stdout is closing
            elif event == select.POLLHUP:
                logger.debug("Disabling pty handler for interactive shell")
                break


# Function for reading outputs from the given queue by draining it and returning the output
def _get_queue_output(queue: Queue) -> str:
    value = ""
    try:
        while True:
            value += queue.get_nowait()
    except Empty:
        return value


# Helper function to create the needed list for popen and print the command run to the logger
def popen_command(command, logger, *args):
    popen_list = list()
    popen_list.append(command)
    command_output = command
    for arg in args:
        popen_list.append(arg)
        command_output += " " + arg
    lib_logger.debug("Making Popen call using: " + str(popen_list))
    logger.debug("")
    logger.debug(command_output)
    logger.debug("")

    return popen_list


# Class for create an interactive shell and sending commands to it along with logging output to loggers
class InteractiveShell(object):
    def __init__(self, command, logger, *args):
        self.logger = logger
        self.command = command
        self.process = None
        self.popen_list = popen_command(command, logger, *args)
        self.master_stdout = None
        self.slave_stdout = None
        self.master_stderr = None
        self.slave_stderr = None
        self.stdout_handler = None
        self.stderr_handler = None
        self.stdout_queue = None
        self.stderr_queue = None
        self.stop_handlers = False

    # Open interactive shell and setup all threaded IO handlers
    def open(self, shell_prompt, timeout=DEVICE_TIMEOUT):
        # Create PTYs
        self.master_stdout, self.slave_stdout = pty.openpty()
        self.master_stderr, self.slave_stderr = pty.openpty()

        # Create shell subprocess
        self.process = subprocess.Popen(self.popen_list, stdin=self.slave_stdout, stdout=self.slave_stdout,
                                        stderr=self.slave_stderr, bufsize=0, start_new_session=True)

        lib_logger.debug("")
        lib_logger.debug("Started interactive shell for command " + self.command)
        lib_logger.debug("")

        # Create thread and queues for handling pty output and start them
        self.stdout_queue = Queue()
        self.stderr_queue = Queue()
        self.stdout_handler = threading.Thread(target=_pty_handler, args=(self.master_stdout,
                                                                          lib_logger,
                                                                          self.stdout_queue,
                                                                          lambda: self.stop_handlers))
        self.stderr_handler = threading.Thread(target=_pty_handler, args=(self.master_stderr,
                                                                          lib_logger,
                                                                          self.stderr_queue,
                                                                          lambda: self.stop_handlers))
        self.stdout_handler.daemon = True
        self.stderr_handler.daemon = True
        lib_logger.debug("Enabling stderr handler for interactive shell " + self.command)
        self.stderr_handler.start()
        lib_logger.debug("Enabling stdout handler for interactive shell " + self.command)
        self.stdout_handler.start()

        # Wait for shell prompt
        lib_logger.debug("Waiting for shell prompt: " + shell_prompt)
        return self.wait_for(shell_prompt, timeout)

    # Close interactive shell which should also kill all threaded IO handlers
    def close(self):
        # Wait 5 seconds before closing to let shell handle all input and outputs
        time.sleep(5)

        # Stop IO handler threads and terminate the process then wait another 5 seconds for cleanup to happen
        self.stop_handlers = True
        self.process.terminate()
        time.sleep(5)

        # Check for any additional output from the stdout handler
        output = ""
        while True:
            data = _get_queue_output(self.stdout_queue)
            if data != "":
                output += data
            else:
                break
        for line in iter(output.splitlines()):
            self.logger.debug(line)

        # Check for any additional output from the stderr handler
        output = ""
        while True:
            data = _get_queue_output(self.stderr_queue)
            if data != "":
                output += data
            else:
                break
        for line in iter(output.splitlines()):
            self.logger.error(line)

        # Cleanup PTYs
        os.close(self.master_stdout)
        os.close(self.master_stderr)
        os.close(self.slave_stdout)
        os.close(self.slave_stderr)

        lib_logger.debug("Interactive shell command " + self.command + " terminated")

    # Run series of commands given as a list of a list of commands and wait_for strings. If no wait_for is needed then
    # only provide the command. Return if all the commands completed successfully or not.
    # Ex:
    # [
    #     ["ssh jsas@" + vnf_ip, r"jsas@.*:"],
    #     ["juniper123", r"jsas@.*\$"],
    #     ["sudo su", r".*jsas:"],
    #     ["juniper123", r"root@.*#"],
    #     ["usermod -p 'blah' jsas"]
    # ]
    def run_commands(self, commands_list):
        shell_status = True
        for command in commands_list:
            shell_status = self.run(command[0])
            if shell_status and len(command) == 2:
                shell_status = self.wait_for(command[1])

            # Break out of running commands if a command failed
            if not shell_status:
                break

        return shell_status

    # Run given command and return False if error occurs otherwise return True
    def run(self, command, sleep=0):
        # Check process to make sure it is still running and if not grab the stderr output
        if self.process.poll():
            self.logger.error("Interactive shell command " + self.command + " closed with return code: " +
                              self.process.returncode)
            data = _get_queue_output(self.stderr_queue)
            if data != "":
                self.logger.error("Interactive shell error messages:")
                for line in iter(data.splitlines()):
                    self.logger.error(line)
            return False

        # Write command to process and check to make sure a newline is in command otherwise add it
        if "\n" not in command:
            command += "\n"
        os.write(self.master_stdout, command.encode("utf-8"))
        if sleep:
            time.sleep(sleep)

        return True

    # Wait for specific regex expression in output before continuing return False if wait time expires otherwise return
    # True
    def wait_for(self, this, timeout=DEVICE_TIMEOUT):
        timeout = datetime.now() + timedelta(seconds=timeout)
        output = ""

        # Keep searching for output until timeout occurs
        while timeout > datetime.now():
            data = _get_queue_output(self.stdout_queue)
            if data != "":
                # Add to output line and check for match to regex given and if match then break and send output to
                # logger
                output += data
                lib_logger.debug("Checking for " + this + " in data: ")
                for line in iter(output.splitlines()):
                    lib_logger.debug(line)
                if re.search(r"{}\s?$".format(this), output):
                    break
            time.sleep(1)

        # Send output to logger
        for line in iter(output.splitlines()):
            self.logger.debug(line)

        # If wait time expired print error message and return False
        if timeout < datetime.now():
            self.logger.error("Wait time expired when waiting for " + this)
            return False

        return True
TheLizzard
  • 7,248
  • 2
  • 11
  • 31