If the data the native process writes are large (larger than pipe buffer), the native program would block until you make some space in the pipe by reading it.
The solution from lunixbochs, however, needs the native process to finish before it starts reading the pipe. I improved the solution so that it reads the pipe in parallel from a separate thread. This way you can capture output of any size.
This solution is also inspired by https://stackoverflow.com/a/16571630/1076564 and captures both stdout and stderr:
class CtypesStdoutCapture(object):
def __enter__(self):
self._pipe_out, self._pipe_in = os.pipe()
self._err_pipe_out, self._err_pipe_in = os.pipe()
self._stdout = os.dup(1)
self._stderr = os.dup(2)
self.text = ""
self.err = ""
# replace stdout with our write pipe
os.dup2(self._pipe_in, 1)
os.dup2(self._err_pipe_in, 2)
self._stop = False
self._read_thread = threading.Thread(target=self._read, args=["text", self._pipe_out])
self._read_err_thread = threading.Thread(target=self._read, args=["err", self._err_pipe_out])
self._read_thread.start()
self._read_err_thread.start()
return self
def __exit__(self, *args):
self._stop = True
self._read_thread.join()
self._read_err_thread.join()
# put stdout back in place
os.dup2(self._stdout, 1)
os.dup2(self._stderr, 2)
self.text += self.read_pipe(self._pipe_out)
self.err += self.read_pipe(self._err_pipe_out)
# check if we have more to read from the pipe
def more_data(self, pipe):
r, _, _ = select.select([pipe], [], [], 0)
return bool(r)
# read the whole pipe
def read_pipe(self, pipe):
out = ''
while self.more_data(pipe):
out += os.read(pipe, 1024)
return out
def _read(self, type, pipe):
while not self._stop:
setattr(self, type, getattr(self, type) + self.read_pipe(pipe))
sleep(0.001)
def __str__(self):
return self.text
# Usage:
with CtypesStdoutCapture as capture:
lib.native_fn()
print(capture.text)
print(capture.err)