3

I'm trying to port this Python script that sends and receives input to a helper process to Rust:

import subprocess
data = chr(0x3f) * 1024 * 4096
child = subprocess.Popen(['cat'], stdin=subprocess.PIPE,   stdout=subprocess.PIPE)
output, _ = child.communicate(data)
assert output == data

My attempt worked fine until the input buffer exceeded 64k because presumably the OS's pipe buffer filled up before the input was written.

use std::io::Write;

const DATA: [u8; 1024 * 4096] = [0x3f; 1024 * 4096];

fn main() {
    let mut child = std::process::Command::new("cat")
                        .stdout(std::process::Stdio::piped())
                        .stdin(std::process::Stdio::piped())
                        .spawn()
                        .unwrap();
    match child.stdin {
        Some(ref mut stdin) => {
            match stdin.write_all(&DATA[..]) {
                Ok(_size) => {}
                Err(err) => panic!(err),
            }
        }
        None => unreachable!(),
    }
    let res = child.wait_with_output();
    assert_eq!(res.unwrap().stdout.len(), DATA.len())
}

Is there a subprocess.communicate equivalent in Rust? Maybe a select equivalent? Can mio be used to solve this problem? Also, there seems to be no way to close stdin.

The goal here is to make a high performance system, so I want to avoid spawning a thread per task.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
hellcatv
  • 573
  • 4
  • 21
  • 1
    - "*so I want to avoid spawning a thread per task*" - `Popen.communicate` does in fact use threads when working with more than two pipes: http://stackoverflow.com/a/12965273/257568. So, in your case, can the communication be limited to two pipes? – ArtemGr May 09 '16 at 10:34
  • @ArtemGr Do you mean **less** than two pipes? The comment in the linked question says "If we are only using one pipe, or no pipe at all". – Shepmaster May 09 '16 at 12:55
  • @Shepmaster Right, sorry, if there's less than one pipe then threading or non-blocking I/O is unnecessary. – ArtemGr May 09 '16 at 16:34
  • @ArtemGr lol, *less than one* would only be zero pipes, and *any* I/O would be unnecessary ;-) – Shepmaster May 09 '16 at 16:57
  • Popen.communicate does not use threads on OSX or linux: instead it uses poll (see def _communicate_with_poll(self, input) in https://svn.python.org/projects/stackless/trunk/Lib/subprocess.py ) – hellcatv May 09 '16 at 17:14
  • - "*instead it uses poll*" - So... Async IO is not yet integrated into the Rust standard library (https://github.com/rust-lang/rfcs/issues/1081) so your best bet for something like `poll` is an external crate, like `mio`. – ArtemGr May 09 '16 at 17:48

2 Answers2

0

Well it wasn't a small amount of code to get this done, and I needed a combination of mio and nix, because mio wouldn't set AsRawFd items to be nonblocking when they were pipes, so this had to be done first.

Here's the result

extern crate mio;
extern crate bytes;

use mio::*;
use std::io;
use mio::unix::{PipeReader, PipeWriter};
use std::process::{Command, Stdio};
use std::os::unix::io::AsRawFd;
use nix::fcntl::FcntlArg::F_SETFL;
use nix::fcntl::{fcntl, O_NONBLOCK};
extern crate nix;

struct SubprocessClient {
    stdin: PipeWriter,
    stdout: PipeReader,
    output : Vec<u8>,
    input : Vec<u8>,
    input_offset : usize,
    buf : [u8; 65536],
}


// Sends a message and expects to receive the same exact message, one at a time
impl SubprocessClient {
    fn new(stdin: PipeWriter, stdout : PipeReader, data : &[u8]) -> SubprocessClient {
        SubprocessClient {
            stdin: stdin,
            stdout: stdout,
            output : Vec::<u8>::new(),
            buf : [0; 65536],
            input : data.to_vec(),
            input_offset : 0,
        }
    }

    fn readable(&mut self, _event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
        println!("client socket readable");


        match self.stdout.try_read(&mut self.buf[..]) {
            Ok(None) => {
                println!("CLIENT : spurious read wakeup");
            }
            Ok(Some(r)) => {
                println!("CLIENT : We read {} bytes!", r);
                self.output.extend(&self.buf[0..r]);
            }
            Err(e) => {
                return Err(e);
            }
        };
        return Ok(());
    }

    fn writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
        println!("client socket writable");

        match self.stdin.try_write(&(&self.input)[self.input_offset..]) {
            Ok(None) => {
                println!("client flushing buf; WOULDBLOCK");
            }
            Ok(Some(r)) => {
                println!("CLIENT : we wrote {} bytes!", r);
                self.input_offset += r;
            }
            Err(e) => println!("not implemented; client err={:?}", e)
        }
        if self.input_offset == self.input.len() {
            event_loop.shutdown();
        }
        return Ok(());
    }

}

impl Handler for SubprocessClient {
    type Timeout = usize;
    type Message = ();

    fn ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token,
             events: EventSet) {
        println!("ready {:?} {:?}", token, events);
        if events.is_readable() {
            let _x = self.readable(event_loop);
        }
        if events.is_writable() {
            let _y = self.writable(event_loop);
        }
    }
}



pub fn from_nix_error(err: ::nix::Error) -> io::Error {
    io::Error::from_raw_os_error(err.errno() as i32)
}

fn set_nonblock(s: &AsRawFd) -> io::Result<()> {
    fcntl(s.as_raw_fd(), F_SETFL(O_NONBLOCK)).map_err(from_nix_error)
                                             .map(|_| ())
}


const TEST_DATA : [u8; 1024 * 4096] = [40; 1024 * 4096];
pub fn echo_server() {
    let mut event_loop = EventLoop::<SubprocessClient>::new().unwrap();
    let process =
           Command::new("cat")
           .stdin(Stdio::piped())
           .stdout(Stdio::piped())
           .spawn().unwrap();
    let raw_stdin_fd;
    match process.stdin {
      None => unreachable!(),
      Some(ref item) => {
          let err = set_nonblock(item);
          match err {
             Ok(()) => {},
             Err(e) => panic!(e),
          }
          raw_stdin_fd = item.as_raw_fd();
      },
    }
    let raw_stdout_fd;
    match process.stdout {
      None => unreachable!(),
      Some(ref item) => {
          let err = set_nonblock(item);
          match err {
             Ok(()) => {},
             Err(e) => panic!(e),
          }
          raw_stdout_fd = item.as_raw_fd();},
    }
    //println!("listen for connections {:?} {:?}", , process.stdout.unwrap().as_raw_fd());
    let mut subprocess = SubprocessClient::new(PipeWriter::from(Io::from_raw_fd(raw_stdin_fd)),
                                               PipeReader::from(Io::from_raw_fd(raw_stdout_fd)),
                                               &TEST_DATA[..]);
    let stdout_token : Token = Token(0);
    let stdin_token : Token = Token(1);
    event_loop.register(&subprocess.stdout, stdout_token, EventSet::readable(),
                        PollOpt::level()).unwrap();

    // Connect to the server
    event_loop.register(&subprocess.stdin, stdin_token, EventSet::writable(),
                        PollOpt::level()).unwrap();

    // Start the event loop
    event_loop.run(&mut subprocess).unwrap();
    let res = process.wait_with_output();
    match res {
       Err(e) => {panic!(e);},
       Ok(output) => {
          subprocess.output.extend(&output.stdout);
          println!("Final output was {:}\n", output.stdout.len());
       },
    }
    println!("{:?}\n", subprocess.output.len());
}

fn main() {
  echo_server();
}

Basically the only way to close stdin was to call process.wait_with_output since the Stdin has no close primitive

Once this happened, the remaining input could extend the output data vector.

There's now a crate that does this

https://crates.io/crates/subprocess-communicate

hellcatv
  • 573
  • 4
  • 21
-1

In this particular example, you know that the input and output amounts are equivalent, so you don't need threads at all. You can just write a bit and then read a bit:

use std::io::{self, Cursor, Read, Write};

static DATA: [u8; 1024 * 4096] = [0x3f; 1024 * 4096];
const TRANSFER_LIMIT: u64 = 32 * 1024;

fn main() {
    let mut child = std::process::Command::new("cat")
        .stdout(std::process::Stdio::piped())
        .stdin(std::process::Stdio::piped())
        .spawn()
        .expect("Could not start child");

    let mut input = Cursor::new(&DATA[..]);
    let mut output = Cursor::new(Vec::new());

    match (child.stdin.as_mut(), child.stdout.as_mut()) {
        (Some(stdin), Some(stdout)) => {
            while input.position() < input.get_ref().len() as u64 {
                io::copy(&mut input.by_ref().take(TRANSFER_LIMIT), stdin).expect("Could not copy input");
                io::copy(&mut stdout.take(TRANSFER_LIMIT), &mut output).expect("Could not copy output");
            }
        },
        _ => panic!("child process input and output were not opened"),
    }

    child.wait().expect("Could not join child");
    let res = output.into_inner();

    assert_eq!(res.len(), DATA.len());
    assert_eq!(&*res, &DATA[..]);
}

If you didn't have that specific restriction, you will need to use select from the libc crate, which requires file descriptors for the pipes so will probably restrict your code to running on Linux / OS X.

You could also start threads, one for each pipe (and reuse the parent thread for one of the pipes), but you've already ruled that out.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • Interesting...this other stackoverflow question hints at possible use of mio to solve the problem, but no further explanation is given http://stackoverflow.com/questions/34611742/how-do-i-read-the-output-of-a-child-process-without-blocking-in-rust – hellcatv May 09 '16 at 17:15
  • We can also assume this is another type of process like gunzip where the input and output ratios are unpredictable – hellcatv May 09 '16 at 17:23