4

Under Python 3.4 on Windows, I need to stream data written to stdout/stderr by a child process, i.e. receive its output as it occurs, using the asyncio framework introduced in Python 3.4. I also have to determine the program's exit code afterwards. How can I do this?

aknuds1
  • 65,625
  • 67
  • 195
  • 317

4 Answers4

8

The solution I've come up with so far uses SubprocessProtocol to receive output from the child process, and the associated transport to get the process' exit code. I don't know if this is optimal though. I've based my approach on an answer to a similar question by J.F. Sebastian.

import asyncio
import contextlib
import os
import locale


class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1:
            name = 'stdout'
        elif fd == 2:
            name = 'stderr'
        text = data.decode(locale.getpreferredencoding(False))
        print('Received from {}: {}'.format(name, text.strip()))

    def process_exited(self):
        loop.stop()


if os.name == 'nt':
    # On Windows, the ProactorEventLoop is necessary to listen on pipes
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
with contextlib.closing(loop):
    # This will only connect to the process
    transport = loop.run_until_complete(loop.subprocess_exec(
        SubprocessProtocol, 'python', '-c', 'print(\'Hello async world!\')'))[0]
    # Wait until process has finished
    loop.run_forever()
    print('Program exited with: {}'.format(transport.get_returncode()))
Community
  • 1
  • 1
aknuds1
  • 65,625
  • 67
  • 195
  • 317
1

I guess to use high-level api:

proc = yield from asyncio.create_subprocess_exec(
    'python', '-c', 'print(\'Hello async world!\')')

stdout, stderr = yield from proc.communicate()

retcode = proc.returncode

Also you can do more:

yield from proc.stdin.write(b'data')
yield from proc.stdin.drain()

stdout = yield from proc.stdout.read()
stderr = yield from proc.stderr.read()

retcode = yield from proc.wait()

and so on.

But, please, keep in mind that waiting for, say, stdout when child process prints nothing can hang you coroutine.

Andrew Svetlov
  • 16,730
  • 8
  • 66
  • 69
  • 1
    None of these techniques are non-blocking though? I need to handle output on both stdout and stderr as it arrives, not after the child exits. – aknuds1 Jun 26 '14 at 21:22
  • Can the second example deadlock if the child process generates enough output on stderr? – jfs Dec 26 '14 at 00:03
  • Yes, it can. You may create separate reader coroutines for both stdout and stderr and execute those in parallel by `asyncio.gather()` for example. – Andrew Svetlov Dec 26 '14 at 19:43
1

Since the event loop may see and notify the process exit before reading the remaining data for stdout/stderr, we need to check PIPE close events in addition to the process exit event.

This is a correction for aknuds1 answer:

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def __init__(self):
        self._exited = False
        self._closed_stdout = False
        self._closed_stderr = False

    @property
    def finished(self):
        return self._exited and self._closed_stdout and self._closed_stderr

    def signal_exit(self):
        if not self.finished:
            return
        loop.stop()        

    def pipe_data_received(self, fd, data):
        if fd == 1:
            name = 'stdout'
        elif fd == 2:
            name = 'stderr'
        text = data.decode(locale.getpreferredencoding(False))
        print('Received from {}: {}'.format(name, text.strip()))

    def pipe_connection_lost(self, fd, exc):
        if fd == 1:
            self._closed_stdout = True
        elif fd == 2:
            self._closed_stderr = True
        self.signal_exit()

    def process_exited(self):
        self._exited = True
        self.signal_exit()
ymmt2005
  • 174
  • 9
0

I'm on python 3.9, so I'm not sure how possible this is in earlier versions. With asyncio, simply create a subprocess, create a task to handle the stdout, and then await for the subprocess. Here I'm running the command with a 120s timeout:

import asyncio

async def _handle_stdout(stdout: asyncio.streams.StreamReader):
    while True:
        await asyncio.sleep(0)
        data = await stdout.readline()
        line = data.decode('ascii')
        if line:
            print(line) # or send to file or wherever

def _execute(cmd) -> asyncio.subprocess.Process:
    proc: asyncio.subprocess.Process = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE,
                                                                             stderr=asyncio.subprocess.PIPE)
    asyncio.create_task(_handle_stdout(proc.stdout))
    await asyncio.wait_for(proc.wait(), timeout=120)
    return proc

def run_my_command(self):
    cmd = "some-command-to-run.sh"
    process = asyncio.run(self._execute(args))

Note that await asyncio.sleep(0) statement is required so that the _handle_stdout coroutine can pass execution back.

Dan Alvizu
  • 1,291
  • 9
  • 23