0

I have a subprocess, which may or may not write something to it's stdout in a specific amount of time, e.g. 3 seconds.

If a new line in the subprocess stdout starts with the correct thing, I want to return the line. Optimally I would like to realize something like this:

use std::io::{BufRead, BufReader};
use std::thread;
use std::time::Duration;

pub fn wait_for_or_exit(
    reader: &BufReader<&mut std::process::ChildStdout>,
    wait_time: u64,
    cmd: &str,
) -> Option<String> {
    let signal: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
    let signal_clone = signal.clone();
    let child = thread::spawn(move || {
        thread::sleep(Duration::from_millis(wait_time));
        signal_clone.store(true, Ordering::Relaxed);
    });
    let mut line = String::new();
    while !signal.load(Ordering::Relaxed) {
        //Sleep a really small amount of time not to block cpu
        thread::sleep(Duration::from_millis(10));
        //This line is obviously invalid!
        if reader.has_input() {
            line.clear();
            reader.read_line(&mut line).unwrap();
            if line.starts_with(cmd) {
                return Some(line);
            }
        }
    }
    None
}

The only line not working here is reader.has_input().

Obviously, if the subprocess answers much faster than the wait_time for a repeated amount of times, there will be a lot of sleeping threads, but I can take care of that with channels.

Fabian v.d.W
  • 155
  • 12

1 Answers1

2

There are two approaches.

  1. You can spin up a separate thread, and then use some mechanism (probably a channel) to signal success or failure to your waiting thread.
  2. You can use async IO as you mentioned, such as the futures and tokio lib.

I'll demo both. I prefer the futures/Tokio approach, but if you're not familiar with the futures model, then option one might be better.

The Rust stdlib has a Channels API, and this channel actually features a recv_timeout which can help us out quite a bit.

use std::thread;
use std::time::Duration;
use std::sync::mpsc;

// this spins up a separate thread in which to wait for stuff to read
// from the BufReader<ChildStdout> 
// If we successfully read, we send the string over the Channel.
// Back in the original thread, we wait for an answer over the channel
// or timeout in wait_time secs. 
pub fn wait_for_or_exit(
    reader: &BufReader<&mut std::process::ChildStdout>,
    wait_time: u64,
    cmd: &str,
) -> Option<String> {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        let line = reader.read_line();
        sender.send(line);
    });

    match receiver.recv_timeout(Duration::from_secs(wait_time)) {
        Ok(line) => if line.starts_with(cmd) 
           { Some(line) } else 
           { None },
        Err(mpsc::RecvTimeoutError::Timeout) => None,
        Err(mpsc::RecvTimeoutError::Disconnected) => None  

    }
}

Option two assumes that you're building a future's based app. In order to accomplish what you want using Async IO is a file descriptor that will let us set NON_BLOCKING. Luckily we don't have to do that ourselves. The Futures and Tokio APIs handle this nicely. The trade-off, is that you have to compose your code out of non-blocking futures.

The code below was taken almost entirely from Tokio Process with a Futures timeout that comes from the Tokio API.

extern crate futures;
extern crate tokio;
extern crate tokio_process;

use std::process::Command;
use std::time::{Duration};

use futures::Future;
use tokio_process::CommandExt;
use tokio::prelude::*;

const TIMEOUT_SECS: u64 = 3;

fn main() {
    // Like above, but use `output_async` which returns a future instead of
    // immediately returning the `Child`.
    let output = Command::new("echo").arg("hello").arg("world")
                        .output_async();

    let future = output.map_err(|e| panic!("failed to collect output: {}", e))
        .map(|output| {
            assert!(output.status.success());
            assert_eq!(output.stdout, b"hello world\n");
            println!("received output: {}",     String::from_utf8(output.stdout).unwrap());
        })
        .timeout(Duration::from_secs(TIMEOUT_SECS)) // here is where we say we only want to wait TIMETOUT seconds
        .map_err(|_e| { println!("Timed out waiting for data"); });

    tokio::run(future);
}
Eigenrick
  • 66
  • 1
  • 4
  • I can't get option one to compile. In `thread::spawn` it complains about reader needing a static lifetime. However, I don't think option one is viable anyway, since the underlying child process will be killed and restarted several hundred times and it is not guaranteed that it will answer. This would result in several hundred listener threads, wouldn't it? I am trying option two now. – Fabian v.d.W Apr 23 '19 at 08:54