4

I want to implement a futures::Stream for reading and parsing the standard output of a child subprocess.

What I'm doing at the moment:

  • spawn subprocess and obtain its stdout via std::process methods: let child = Command::new(...).stdout(Stdio.pipe()).spawn().expect(...)

  • add AsyncRead and BufRead to stdout:

    let stdout = BufReader::new(tokio_io::io::AllowStdIo::new(
        child.stdout.expect("Failed to open stdout"),
    ));
    
  • declare a wrapper struct for stdout:

    struct MyStream<Io: AsyncRead + BufRead> {
        io: Io,
    }
    
  • implement Stream:

    impl<Io: AsyncRead + BufRead> Stream for MyStream<Io> {
        type Item = Message;
        type Error = Error;
    
        fn poll(&mut self) -> Poll<Option<Message>, Error> {
            let mut line = String::new();
            let n = try_nb!(self.io.read_line(&mut line));
            if n == 0 {
                return Ok(None.into());
            }
            //...read & parse further
        }
    }
    

The problem is that AllowStdIo doesn't make ChildStdout magically asynchronous and the self.io.read_line call still blocks.

I guess I need to pass something different instead of Stdio::pipe() to have it asynchronous, but what? Or is there a different solution for that?

This question is different from What is the best approach to encapsulate blocking I/O in future-rs? because I want to get asynchronous I/O for the specific case of a subprocess and not solve the problem of encapsulation of synchronous I/O.

Update: I'm using tokio = "0.1.3" to leverage its runtime feature and using tokio-process is not an option at the moment (https://github.com/alexcrichton/tokio-process/issues/27)

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366

2 Answers2

4

The tokio-process crate provides you with a CommandExt trait that allows you to spawn a command asynchronously.

The resulting Child has a getter for ChildStdout which implements Read and is non-blocking.

Wrapping tokio_process::ChildStdout into AllowStdIo as you did in your example should make it work!

2023 Update

tokio-process::CommandExt has been deprecated in favor of tokio::process::Command which you can use in a relatively similar way.

Ivan Gabriele
  • 6,433
  • 5
  • 39
  • 60
  • Thanks! I think it will not work well with new tokio yet (https://github.com/alexcrichton/tokio-process/issues/27) which I want to use to leverage its runtime feature. I'll update my question with that clarification. – Ruslan Prakapchuk Mar 13 '18 at 00:26
1

Here is my version using tokio::process

let mut child = match Command::new(&args.run[0])
        .args(parameters)
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .kill_on_drop(true)
        .spawn() 
    {
        Ok(c) => c,
        Err(e) => panic!("Unable to start process `{}`. {}", args.run[0], e),
    };

    let stdout = child.stdout.take().expect("child did not have a handle to stdout");
    let stderr = child.stderr.take().expect("child did not have a handle to stderr");

    let mut stdout_reader = BufReader::new(stdout).lines();
    let mut stderr_reader = BufReader::new(stderr).lines();

    loop {
        tokio::select! {
            result = stdout_reader.next_line() => {
                match result {
                    Ok(Some(line)) => println!("Stdout: {}", line),
                    Err(_) => break,
                    _ => (),
                }
            }
            result = stderr_reader.next_line() => {
                match result {
                    Ok(Some(line)) => println!("Stderr: {}", line),
                    Err(_) => break,
                    _ => (),
                }
            }
            result = child.wait() => {
                match result {
                    Ok(exit_code) => println!("Child process exited with {}", exit_code),
                    _ => (),
                }
                break // child process exited
            }
        };

    }
Mr.Wang from Next Door
  • 13,670
  • 12
  • 64
  • 97