0

I am writing a watchdog, of sorts, for processes in a test suite. I need to determine if a test hangs.

I could simply start the process with subprocess.Popen(...), and use Popen.wait(timeout=to) or Popen.poll() and keep my own timer. However, the tests differ greatly in execution time, which makes it impossible to have a good 'timeout' value that is sensible for all tests.

I have found that a good way to determine if a test has hung is to have a 'timeout' for the last time the process output anything. To that end, I considered using

process = subprocess.Popen(args='<program>', stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ...)

and Popen.communicate(), to determine when stdout and/or stderr are not None. The problem is that Popen.communicate(), without a 'timeout' will just wait until the process terminates, and with a 'timeout' will raise a TimeoutExpired exception, from which I can't determine if anything was read. TimeoutExpired.output is empty, BTW.

I could not find anything in the documentation that allows one to perform the 'reads' manually. Also, there is usually a lot of output from the process, so starting it with stdout=<open_file_descriptor> would be beneficial, as I would have no concern for overflowing pipe buffers.

Update/Solution:

Popen.stdout and Popen.stderr return a "readable stream object", which one can use to manually poll/select and read. I ended up using select 'Polling Objects', which use the poll() system call, as bellow:

import os
import select
import subprocess

p = subprocess.Popen(args="<program>", shell=True, universal_newlines=True,
                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)
poll_obj = select.poll()
poll_obj.register(p.stdout, select.POLLIN)
poll_obj.register(p.stderr, select.POLLIN)

while p.poll() is None:
    events = True
    while events:
        events = poll_obj.poll(10)
        for fd, event in events:
            if event & select.POLLIN:
                print("STDOUT: " if fd == p.stdout.fileno() else "STDERR: ")
                print(os.read(fd, 1024).decode())
            # else some other error (see 'Polling Objects')
jfs
  • 399,953
  • 195
  • 994
  • 1,670
Ramon
  • 1,169
  • 11
  • 25

2 Answers2

2

This is kind of covered here..

Essentially you need to use select() to poll the fd's to see if they have input:

#!/usr/bin/python

import fcntl import os import select import subprocess


def setnonblocking(fd):
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    return fd

p = subprocess.Popen("/bin/sh -c 'c=10; while [ $c -gt 0 ]; do echo $c hello; sleep 1; >&2 echo world; sleep 1; let c=$c-1; done'", stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)

process_fds = map(setnonblocking, [p.stdout, p.stderr])

while process_fds:
    readable, writable, exceptional = select.select(process_fds, [], process_fds, 100)
    print "Select: ", readable, writable, exceptional
    print "Exitcode: ", p.poll()
    for fd in readable:
        data = os.read(fd.fileno(), 1024)
        if data == "":  # EOF
            process_fds.remove(fd)
            continue
        if fd == p.stdout:
            print "STDOUT: ",
        if fd == p.stderr:
            print "STDERR: ",
        print data,
    for fd in exceptional:
        process_fds.remove(fd)

Output:

Select:  [<open file '<fdopen>', mode 'rb' at 0x7fed75daa6f0>] [] []
Exitcode:  None
STDOUT:  10 hello
Select:  [<open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
Exitcode:  None
STDERR:  world
Select:  [<open file '<fdopen>', mode 'rb' at 0x7fed75daa6f0>] [] []
Exitcode:  None
STDOUT:  9 hello
Select:  [<open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
Exitcode:  None
[...]
STDOUT:  1 hello
Select:  [<open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
Exitcode:  None
STDERR:  world
Select:  [<open file '<fdopen>', mode 'rb' at 0x7fed75daa6f0>, <open file '<fdopen>', mode 'rb' at 0x7fed75daa660>] [] []
Exitcode:  1

os.read() is used instead of fd.read() because you need to read in a non-line oriented way. fd.read() waits until a newline is found -- but then you'll possibly block. With this method you can also split your stderr and stdout.

edit: Revised to handle process exiting before EOF of p.stdout and p.stderr

Community
  • 1
  • 1
rrauenza
  • 6,285
  • 4
  • 32
  • 57
  • @J.F.Sebastian Thanks for catching that! I've revised just to loop until both fds are EOF. – rrauenza Jun 16 '16 at 03:35
  • Use `while some_list:` instead of `while len(some_list):` in Python. I'm not sure it is safe to use `select()` with blocking fds. – jfs Jun 16 '16 at 03:39
  • @J.F.Sebastian Style wise, I generally prefer the explicitness of len(list). From the `select()` system call man page: *A file descriptor is considered ready if it is possible to perform the corresponding I/O operation (e.g., read(2)) without blocking.* That implies the fd could otherwise block. – rrauenza Jun 16 '16 at 03:43
  • Using `while len(items):` instead of `while items:` is not idiomatic. Are you sure `select()` can't return earlier e.g., on a signal? Also, you should probably exit the loop on timeout. – jfs Jun 16 '16 at 04:05
  • It seems whether and how `select()` is interrupted by a signal may depend on the platform and the python version (e.g., an exception may be raised, it may be automatically restarted). I've checked: it is possible that `os.read()` blocks after `select.select()` (because some other processes may consume the pipe or (in case of sockets) due to some weirdness in the network stack implementation: data may be reported ready and then discarded by the kernel for whatever reason)... – jfs Jun 16 '16 at 19:22
  • ...continued: If you want to write to a pipe more than one byte at a time then it is also a good idea to make the pipe non-blocking (`PIPE_BUF` promises atomicity, not whether os.write() may block). Given the number of caveats,bugs,differences in behavior between platforms regarding IO,signals, threading; it is much simpler to make the fds non-blocking even if you can use blocking fds in this specific case. – jfs Jun 16 '16 at 19:23
  • @J.F.Sebastian Changed it to non-blocking, but I'm not entirely happy with the method for determining `EOF`. The fds don't get thrown into `exceptional` as I expected (was looking at examples at https://pymotw.com/2/select/) – rrauenza Jun 16 '16 at 20:32
  • I don't see any issues with EOF. `select()` returns the fd as ready on EOF and `os.read()` in turn returns `b''` on EOF. – jfs Jun 16 '16 at 21:19
  • @J.F.Sebastian I think I just don't like the non blocking read returning an empty string for EOF, when an empty string would be also a normal case for nothing to read yet. But yeah, `select()` told me there was something, so I should trust it. – rrauenza Jun 16 '16 at 21:21
  • It doesn't matter whether the fd is blocking or not: if you get `b''` from `os.read()` then it is EOF. – jfs Jun 16 '16 at 21:29
  • Ah: `os.read()` would throw `OSError: [Errno 11] Resource temporarily unavailable` if there is nothing to read yet, not `""` – rrauenza Jun 16 '16 at 21:54
0

Here's how to implement "timeout since the subprocess' last output" on Unix in Python 3:

#!/usr/bin/env python3
import os
import selectors
import sys
from subprocess import Popen, PIPE, _PopenSelector as Selector

timeout = 1  # seconds
with Popen([sys.executable, '-c', '''import time
for i in range(10):  # dummy script
    time.sleep(i)
    print(i, flush=True)
'''], stdout=PIPE, stderr=PIPE) as process:
    pipes = {process.stdout: 1, process.stderr: 2}  # where to echo data
    with Selector() as sel:
        for pipe in pipes:
            os.set_blocking(pipe.fileno(), False)
            sel.register(pipe, selectors.EVENT_READ)
        while pipes:
            events = sel.select(timeout)
            if not events:  # timeout
                process.kill()
            for key, mask in events:
                assert mask == selectors.EVENT_READ
                data = os.read(key.fd, 512)
                if data == b'':  # EOF
                    sel.unregister(key.fileobj)
                    del pipes[key.fileobj]
                else:  # echo data
                    os.write(pipes[key.fileobj], data)

Note: the loop is not terminated on process.poll()—no data is lost. The code uses the same selector that subprocess authors prefer, otherwise sel = selectors.DefaultSelector() could be used. If a grandchild process may inherit the pipes then you should break the loop on timeout more aggressively (EOF may be delayed). To implement os.set_blocking() before Python 3.5, you could use fcntl:

from fcntl import fcntl, F_GETFL, F_SETFL

def set_nonblocking(fd):
    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | os.O_NONBLOCK) # set O_NONBLOCK
Community
  • 1
  • 1
jfs
  • 399,953
  • 195
  • 994
  • 1,670