0

I have a python script that communicates with C++ program via shell commands. Python script makes calls to C++ program and gets responses through pipes.

C++ program buffers output and blocks threads that read from pipes. I solved these problems using this class:

import os
import subprocess
import threading
import queue
import time
import pty


class DaemonCall():
    def __init__(self):
        self.popen = None
        self.stdoutQueue = None
        self.stderrQueue = None
        self.stdoutThread = None
        self.stderrThread = None

    def __del__(self):
        pass

    def call(self, command):
        masterStdout, slaveStdout = pty.openpty()
        masterStderr, slaveStderr = pty.openpty()
        self.popen = subprocess.Popen(command, shell=True, stdout=slaveStdout, stderr=slaveStderr, bufsize=0)
        self.stdoutQueue, self.stdoutThread = self.getAsyncReadQueue(masterStdout)
        self.stderrQueue, self.stderrThread = self.getAsyncReadQueue(masterStderr)

    @classmethod
    def getAsyncReadQueue(cls, source):
        newQueue = queue.Queue()
        newThread = threading.Thread(target=cls.enqueueOutput, args=(os.fdopen(source), newQueue))
        newThread.daemon = True  # thread dies with the program
        newThread.start()
        return newQueue, newThread

    @staticmethod
    def enqueueOutput(pipe, outputQueue):
        for newLine in iter(pipe.readline, b''):
            outputQueue.put(newLine)
        pipe.close()


callWrapper = DaemonCall()
callWrapper.call('some shell command')
time.sleep(1)
try:
    line = callWrapper.stdoutQueue.get_nowait()  # or q.get(timeout=.1)
except queue.Empty:
    print('no output yet')
else:
    print(line)

Now I have another problem - each call creates two threads to read from pipes, that blocked by C++ program and live until the end of script. I need a way to kill such processes. Best of all - paste some code into the __del__ method

Any ideas how to kill threads which blocked while reading from pipes?

This all works on Ubuntu 14.04, python 3.4

Victor Mezrin
  • 2,797
  • 2
  • 32
  • 48
  • if your threads are blocked "until the end of script" then presumably your C++ processes are also running for that long (otherwise, you'd reach EOF on reading from the pipe)? Why do you want to kill the threads, if the processes they're listening to are still running? – jwd Feb 11 '15 at 23:02
  • See: http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python – jwd Feb 11 '15 at 23:04
  • 1
    Why are you creating pseudoterminals? Use `os.pipe()` to create a connected pair of file descriptors with which to communicate with a subprocess. – John Bollinger Feb 11 '15 at 23:04
  • 1
    If you have no other way to signal your C++ processes to terminate, then make them terminate when they detect EOF on their standard input. Close the python end of that pipe when you're done with that process. Similarly, you should make your threads exit naturally when they detect EOF on the streams they are reading. Having done that, you should not need to forcibly kill your threads. – John Bollinger Feb 11 '15 at 23:09
  • @JohnBollinger, using of os.pipe() helped a lot. Most of calls that I send have immediate response, in this case reading thread closed as expected. But I still have troubles with long running calls - if we don't get response too long, sometimes we don't need it at all. In this case manual closing of the thread that reads pipe may be very usefull – Victor Mezrin Feb 11 '15 at 23:31
  • @jwd, if C++ program do not respond too long and we don't need outdated response - it's better to kill a process, that is reading a pipe. It saves resources - in some cases script can make 100 calls per minute – Victor Mezrin Feb 11 '15 at 23:36
  • @JohnBollinger, I can't sent a call to C++ program to close a pipe. This is the third party software that does not have such API call – Victor Mezrin Feb 11 '15 at 23:40
  • @VictorMezrin, you misunderstand. I suggested that the Python program close *its own* end of the pipe, which it certainly can do. If the C++ program continues to read from its end, then it will (eventually) receive an EOF, which it can take as a signal to terminate. It sounds like that's not going to serve your particular purpose, however, as you seem to want an out-of-band method to terminate these processes. – John Bollinger Feb 12 '15 at 16:05
  • @JohnBollinger: you should not use `os.pipe()` in most cases; `subprocess.PIPE` should be used instead (internally, it might be implemented using `os.pipe()`). If `PIPE` works here instead of `pty` then it should be used because it guarantees that the threads exit if the subprocess dies (it may not be so for `pty`). Though sometimes [`pty` is useful due to reasons outlined here](http://pexpect.readthedocs.org/en/latest/FAQ.html#whynotpipe) – jfs Feb 14 '15 at 16:24
  • @J.F.Sebastian, you should use `os.pipe()` in exactly those cases for which it makes sense. It would be futile to argue how frequently or infrequently those arise, but the one presented in the question certainly is such a case. `subprocess.PIPE` provides only unidirectional communication (subprocess to parent), but bidirectional communication is needed here. `pty` is not appropriate because there is no (human) interactivity involved (and in fact it performed poorly in practice). `os.pipe()` is the right tool for this job. – John Bollinger Feb 15 '15 at 18:58
  • wrong. Read the link: there are two explicit reasons. Look at the subprocess source code to see how PIPE is implemented in terms of os.pipe() -- both are unidirectional. Show me the code for the case in question that uses `os.pipe()` and does something that PIPE can't. – jfs Feb 15 '15 at 19:32
  • @J.F.Sebastian, yes, individual pipes are unidirectional. That's why you create *two* for bidirectional communication, just as the OP did with `pty`s. But you're right that that's a false trail. The fact remains that passing `subprocess.PIPE` as a `subprocess.Popen()` argument and subsequently retrieving the subprocess object's `stdin`, `stdout`, or `stderr` is equivalent to manually setting up pipes to a subprocess with the help of `os.pipe()`. I withdraw my claim that `subprocess.PIPE` is inappropriate, but I still reject your claim that `os.pipe()` is inappropriate. – John Bollinger Feb 17 '15 at 15:57
  • what "OP did with `pty`s" is probably an error. Though I don't know enough to comment. I can understand only when all three standard streams (stdin/stdout/stderr) point to the same pty, [code example](http://stackoverflow.com/a/20509641/4279). I said you should use `PIPE` instead of `os.pipe()` here and you haven't demonstrated any reasons to the contrary. – jfs Feb 17 '15 at 16:11

2 Answers2

1

Just kill the subprocess: self.popen.kill(); self.popen.wait(). The threads will exit automatically (resources such as open pipes are freed when the process dies -- pipe.readline() should return an empty result (meaning EOF)). Though it might fails for pty.openpty() -- close pty fds manually in this case.

You are already using pty (non-portable behavior) therefore you don't need threads (to get portable behavior): you could use pexpect module (a high-level interface around pty) or fcntl (non-blocking read), or select (wait for multiple fds at once with a timeout), or asyncio instead. See code examples:

Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670
0

I have created a class to communicate with another processes via pipes. Class creates separate threads that read/write to pipes and use async queues to communicate with your thread. It;s a proven solution that I use in my project


import time
import subprocess
import queue
import threading

TIMEOUT_POLLINGINTERVAL = 0.5

class ShellCall():
    def __init__(self):
        self._popen = None
        """ :type: subprocess.Popen """
        self._stdOutQueue = None
        """ :type: queue.Queue """
        self._stdErrQueue = None
        """ :type: queue.Queue """
        self._stdOut = []
        self._stdErr = []

    def __del__(self):
        if self._popen and self._popen.poll() is None:
            self._popen.kill()

    def call(self, command, shell=False):
        """
        Execute a shell command

        :param command: command to be executed
        :type command: str | list[str]
        :param shell: If shell is True, the specified command will be executed through the shell
        :type shell: bool
        :rtype: None
        """
        if shell:
            command = command.encode('utf-8')
        else:
            command = [item.encode('utf-8') for item in command]
        self._popen = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, bufsize=0)
        self._stdOutQueue = self._getAsyncReadQueue(self._popen.stdout)
        self._stdErrQueue = self._getAsyncReadQueue(self._popen.stderr)

    def _getAsyncReadQueue(self, sourcePipe):
        """
        Create a thread to read from pipe in asynchronous mode, get queue to receive data from pipe

        :param sourcePipe: Pipe to read from
        :type sourcePipe: pipe
        :return: Queue to receive read data
        :rtype: queue.Queue
        """
        newQueue = queue.Queue()
        newThread = threading.Thread(target=self._enqueueOutput, args=(sourcePipe, newQueue))
        newThread.daemon = True  # thread dies with the program
        newThread.start()
        return newQueue

    @staticmethod
    def _enqueueOutput(sourcePipe, outputQueue):
        """
        Read from pipe and write to the queue

        :param sourcePipe: Pipe to read from
        :type sourcePipe: pipe
        :param outputQueue: Queue to write to
        :type outputQueue: queue.Queue
        """
        for line in iter(sourcePipe.readline, b''):
            outputQueue.put(line)

    def waitNotNoneReturnCode(self, timeout, *, checkCallback=None):
        """
        Wait until any return code

        :param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
        :type timeout: float
        :param checkCallback: Any callable that will be used to check is shell call finished
        :type checkCallback: callable
        :rtype: None
        """
        self._wait(timeout, notNoneReturnCode=True, checkCallback=checkCallback)

    def waitNoErrorReturnCode(self, timeout, *, checkCallback=None):
        """
        Wait until success return code '0'. Otherwise raise ShellException

        :param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
        :type timeout: float
        :param checkCallback: Any callable that will be used to check is shell call finished
        :type checkCallback: callable
        :rtype: None
        """
        self._wait(timeout, notNoneReturnCode=True, noErrorReturnCode=True, checkCallback=checkCallback)

    def waitNoStdErr(self, timeout, *, checkCallback=None):
        """
        Wait until success return code '0' and empty stderr. Otherwise raise ShellException

        :param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
        :type timeout: float
        :param checkCallback: Any callable that will be used to check is shell call finished
        :type checkCallback: callable
        :rtype: None
        """
        self._wait(timeout, notNoneReturnCode=True, noErrorReturnCode=True, noStdErr=True, checkCallback=checkCallback)

    def waitStdOut(self, timeout, *, checkCallback=None):
        """
        Wait until success return code '0', empty stderr and not empty stdout. Otherwise raise ShellException

        :param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
        :type timeout: float
        :param checkCallback: Any callable that will be used to check is shell call finished
        :type checkCallback: callable
        :rtype: None
        """
        self._wait(timeout, notNoneReturnCode=True, noErrorReturnCode=True,
                   noStdErr=True, stdOut=True, checkCallback=checkCallback)

    def _wait(self, timeout, *, pollingTime=TIMEOUT_POLLINGINTERVAL,
              notNoneReturnCode=False, noErrorReturnCode=False, noStdErr=False, stdOut=False, checkCallback=None):
        """
        Raise ShellException if conditions not satisfied (see :func:`checkCallResults`).
        Raise ShellException if conditions not satisfied too long.


        :param timeout: Timeout for executed command (sec). If timeout expired - ShellException raised
        :type timeout: float
        :param pollingTime: Time interval length to check result of command execution
        :type pollingTime: float
        :rtype: None
        """
        startTime = time.time()
        while True:
            if self._checkCallResults(notNoneReturnCode=notNoneReturnCode, noErrorReturnCode=noErrorReturnCode,
                                      noStdErr=noStdErr, stdOut=stdOut, checkCallback=checkCallback):
                return
            # exception due to timeout
            if time.time() - startTime > timeout:
                raise ShellException('Shell call not finished too long', self)
            time.sleep(pollingTime)

    def _checkCallResults(self, notNoneReturnCode=False, noErrorReturnCode=False,
                          noStdErr=False, stdOut=False, checkCallback=None):
        """
        Raise ShellException if noErrorReturnCode=True and shell call return not 0 return call
        Raise ShellException if noStdErr=True and shell call print anything to stderr

        :param notNoneReturnCode: return True only if shell call return any return call
        :type notNoneReturnCode: bool
        :param noErrorReturnCode: return True only if shell call return 0 return code
        :type noErrorReturnCode: bool
        :param noStdErr: return True only if shell call print nothing to stderr
        :type noStdErr: bool
        :param stdOut: return True only if shell call print anything to stdout
        :type stdOut: bool
        :param checkCallback: Any callable that will be used to check is shell call finished,
                              positional arguments, keyword arguments
        :type checkCallback: callable, args, kwargs
        :return: True if conditions are satisfied
        :rtype: bool
        """
        # exceptions
        if noErrorReturnCode:
            if self.getReturnCode() is not None and self.getReturnCode() > 0:
                raise ShellException('Shell call finished with error return code', self)
        if noStdErr:
            if len(self.getStdErr()) > 0:
                raise ShellException('Shell call have non-empty stderr', self)
        # break loop
        notNoneReturnCodeCondition = (self.getReturnCode() is not None) if notNoneReturnCode else True
        noErrorReturnCodeCondition = (self.getReturnCode() == 0) if noErrorReturnCode else True
        notStdErrCondition = (len(self.getStdErr()) == 0) if noStdErr else True
        stdOutCondition = (len(self.getStdOut()) > 0) if stdOut else True
        callbackCondition = checkCallback() if checkCallback else True
        if notNoneReturnCodeCondition and noErrorReturnCodeCondition and \
                notStdErrCondition and stdOutCondition and callbackCondition:
            return True
        else:
            return False

    def getReturnCode(self):
        """
        Get return code of the process

        :return: return code of the child process or None if process is not terminated yet
        :rtype: int|None
        """
        return self._popen.poll()

    def getStdOut(self):
        """
        Get list with stdout lines

        :rtype: list[str]
        """
        self._stdOut += self._readAllQueue(self._stdOutQueue)
        return self._stdOut

    def getStdErr(self):
        """
        Get list with stderr lines

        :rtype: list[str]
        """
        self._stdErr += self._readAllQueue(self._stdErrQueue)
        return self._stdErr

    @staticmethod
    def _readAllQueue(sourceQueue):
        lines = []
        try:
            while True:
                line = sourceQueue.get_nowait()  # or q.get(timeout=.1)
                line = line.decode('utf-8').rstrip()
                lines.append(line)
        except queue.Empty:
            return lines

    def __repr__(self):
        stdOut = str.join(' ', self.getStdOut())
        stdOut = (stdOut[:1000] + '...') if len(stdOut) > 1000 else stdOut
        stdErr = str.join(' ', self.getStdErr())
        stdErr = (stdErr[:1000] + '...') if len(stdErr) > 1000 else stdErr
        return '<ShellCall(command={}, ReturnCode={}, stdout="{}", stderr="{}")>'. \
            format(self._popen.args, self.getReturnCode(), stdOut, stdErr)


class ShellException(Exception):
    def __init__(self, description, shellCall):
        """
        :param description: test description of the error
        :type description: str
        :param shellCall: shell call object used to execute a command
        :type shellCall: ShellCall
        :rtype: None
        """
        super(Exception, self).__init__(description, shellCall)

    def getShellCall(self):
        """
        Get shell call object used to execute a command

        :rtype: ShellCall
        """
        description, shellCall = self.args
        return shellCall
Victor Mezrin
  • 2,797
  • 2
  • 32
  • 48