1

I need to be able to capture the return value of a function and its stdout/stderr in a threaded context. Currently, I'm evaluating a number of subclasses (my_class_foo, my_class_bar) based on this base class (my_base_class) by invoking the is_true method in each subclass.

I'd like to be able to capture both the return value from each is_true as well as the stdout/stderr. The current solution below works in a non-threaded or non-multiprocess context. However, it relies on redirecting the stdout/stderr which obviously won't work if I'm evaluating more than one of these subclasses in parallel.

I've looked at the concurrent.futures and multiprocess and subprocess packages and can't figure out a solution.

I'm trying to avoid using loggers so that users can just rely on printing to stdout instead of using an explicit method.

I would like to execute the is_true methods from my_class_foo and my_class_bar and in parallel and be able to capture the stdout messages with each class' return value.

class my_class_foo(my_base_class):
    def is_true(self):
        print('foo')
        return True


class my_class_bar(my_base_class):
    def is_true(self):
        print('bar')
        return False


class my_base_class(object):

    def is_true(self):
        Raise NotImplementedError

    def evaluate_node_is_true(self):
        with Capturing() as is_true_stdout:
            node_is_true = self.is_true()
            self.output = ''.join(is_true_stdout)


class Capturing(list):
    """
    Context manager for capturing the stdout of the is_true() function call
    """
    def __enter__(self):
        self._stdout = sys.stdout
        self._stderr = sys.stderr
        sys.stdout = self._stringio_out = io.StringIO()
        sys.stderr = self._stringio_err = io.StringIO()
        return self
    def __exit__(self, *args):
        self.extend(self._stringio_out.getvalue().splitlines())
        self.extend(self._stringio_err.getvalue().splitlines())
        del self._stringio_out
        del self._stringio_err
        sys.stdout = self._stdout
        sys.stderr = self._stderr
martineau
  • 119,623
  • 25
  • 170
  • 301
blue
  • 21
  • 3
  • Does this have to work with threads, or can it work only with processes? Because the latter is a whole lot easier. – abarnert Mar 13 '18 at 04:18
  • Also: "I've looked at the concurrent.futures and multiprocess and subprocess packages…" What did you try? Firing off a `subprocess.run` on `sys.executable` with captured stdio is dead simple, and I don't see why it wouldn't work for you. – abarnert Mar 13 '18 at 04:19
  • By the way, there's been some talk of making `stdout` and friends wrappable in a PEP 567 contextvar once they're tied together properly with subinterpreters. Don't hold your breath—the earliest it could plausibly happen is 3.9, even if they decide to do it—but it would obviously make this problem a lot easier. – abarnert Mar 13 '18 at 04:41
  • If I can get this to work with processes, then I'll be satisfied. The problem with processes is that I don't know how to get the return value on the sys.executable in your answer below. Is there a way to do that? Thanks in advance. – blue Mar 13 '18 at 05:51
  • Well, a subprocess can't return anything except an 8-bit integer, so you'll need to find some other way to pass it—a special final line in stdout, or a pipe that you pass down as an extra fd, etc. – abarnert Mar 13 '18 at 06:04
  • Given that the return value on my function is just a bool, I could override the return code with my own setting, i.e. true = 100 and false = 101. This way I could get stdout/stderr and the return value of the sys.executable. Would I be opening myself up to any other issues if I tried this when using subprocess? – blue Mar 13 '18 at 17:13
  • Actually, if the return value is a bool, you might want to use `0` for either true (or whichever of true/false best matches "success", if it's more about success than truth) and 1 for false (or maybe some other value between 2 and 127 for whichever one best matches "failure"). – abarnert Mar 13 '18 at 17:38
  • That's what I'll do. Thanks for all the help. – blue Mar 13 '18 at 22:19

1 Answers1

0

The simplest solution is probably subprocess. Of course this won't work if you want to share any data with the children. And it will require writing a simple separate driver script for the subprocess instead of relying on the same script as the main program. But if it works, it's as simple as this:

res = subprocess.run([sys.executable, driver_script_name],
                     stdout=subprocess.PIPE, stderr=subprocess.PIPE)

And then you've got res.stdout and res.stderr to read from.

To do multiple children concurrently, the simplest solution is to fire off a thread for each subprocess.run. If you want a pool of only, say, 8 at a time, use a ThreadPoolExecutor instead.

The biggest problem with subprocess for a problem like yours is that you want stdout, and stderr, and a return value. Processes give you stdout, stderr, and a return code, but that code is just an 8-bit number.

In your case (based on your comments), the return value is actually a bool, so that may be fine. There's a quasi-standard for Unix command-line tools that do boolean things to return 0 for true and 1 for false—as with the true and false tools.

This is backward from what you'd probably expect, but it fits in with the more general quasi-standard that 0 means success, 1 means general error, 2 means parameter error, 3-127 are tool-specific errors, and 128-255 are not used because you sometimes lose the last bit if killed by a signal. If you (or your users) might ever want to test your sub-programs from the shell, go with 0 for true.


You can also do this with multiprocessing, but it's a bit trickier. You can create the pipes manually and dup them over stdio, but that's hard to get right. Worse, I don't think the way to do it is documented, so you'll have to dig into the source for multiprocessing to do it.


In fact, at least if you're on *nix, it might be easier to manually fork in that case. Sure, it's low-level, and there's lots of room for error—but then (assuming you know Unix fork well enough) you know exactly what to do and where.


A simpler option for multiprocessing is to do what this answer demonstrates and just have each child redirect its stdout to a file, which the parent can then read later.


One option that will work even for threads is to replace sys.stdout with a custom file-like object that appends to a thread-specific buffer. You could use thread-local data and then copy it back at the end, but it might be simpler to just use a dict keyed by thread id. Something like this:

class ThreadedStdWriter(io.RawIOBase):
    ThreadedStdWriter.buffers = {}
    def __init__(self):
        ThreadedStdWriter.buffers[threading.get_ident()] = []
    def write(self, b):
        ThreadedStdWriter.buffers.append(b)

… and then wrap it in a BufferedWriter and a TextIOWrapper and store the result as sys.stdout, and then do the same for sys.stderr. Then, the stdout of any thread is just b''.join(ThreadedStdWriter.buffers[thread_id]).

abarnert
  • 354,177
  • 51
  • 601
  • 671