Well it wasn't a small amount of code to get this done, and I needed a combination of mio and nix, because mio wouldn't set AsRawFd items to be nonblocking when they were pipes, so this had to be done first.
Here's the result
extern crate mio;
extern crate bytes;
use mio::*;
use std::io;
use mio::unix::{PipeReader, PipeWriter};
use std::process::{Command, Stdio};
use std::os::unix::io::AsRawFd;
use nix::fcntl::FcntlArg::F_SETFL;
use nix::fcntl::{fcntl, O_NONBLOCK};
extern crate nix;
struct SubprocessClient {
stdin: PipeWriter,
stdout: PipeReader,
output : Vec<u8>,
input : Vec<u8>,
input_offset : usize,
buf : [u8; 65536],
}
// Sends a message and expects to receive the same exact message, one at a time
impl SubprocessClient {
fn new(stdin: PipeWriter, stdout : PipeReader, data : &[u8]) -> SubprocessClient {
SubprocessClient {
stdin: stdin,
stdout: stdout,
output : Vec::<u8>::new(),
buf : [0; 65536],
input : data.to_vec(),
input_offset : 0,
}
}
fn readable(&mut self, _event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
println!("client socket readable");
match self.stdout.try_read(&mut self.buf[..]) {
Ok(None) => {
println!("CLIENT : spurious read wakeup");
}
Ok(Some(r)) => {
println!("CLIENT : We read {} bytes!", r);
self.output.extend(&self.buf[0..r]);
}
Err(e) => {
return Err(e);
}
};
return Ok(());
}
fn writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
println!("client socket writable");
match self.stdin.try_write(&(&self.input)[self.input_offset..]) {
Ok(None) => {
println!("client flushing buf; WOULDBLOCK");
}
Ok(Some(r)) => {
println!("CLIENT : we wrote {} bytes!", r);
self.input_offset += r;
}
Err(e) => println!("not implemented; client err={:?}", e)
}
if self.input_offset == self.input.len() {
event_loop.shutdown();
}
return Ok(());
}
}
impl Handler for SubprocessClient {
type Timeout = usize;
type Message = ();
fn ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token,
events: EventSet) {
println!("ready {:?} {:?}", token, events);
if events.is_readable() {
let _x = self.readable(event_loop);
}
if events.is_writable() {
let _y = self.writable(event_loop);
}
}
}
pub fn from_nix_error(err: ::nix::Error) -> io::Error {
io::Error::from_raw_os_error(err.errno() as i32)
}
fn set_nonblock(s: &AsRawFd) -> io::Result<()> {
fcntl(s.as_raw_fd(), F_SETFL(O_NONBLOCK)).map_err(from_nix_error)
.map(|_| ())
}
const TEST_DATA : [u8; 1024 * 4096] = [40; 1024 * 4096];
pub fn echo_server() {
let mut event_loop = EventLoop::<SubprocessClient>::new().unwrap();
let process =
Command::new("cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn().unwrap();
let raw_stdin_fd;
match process.stdin {
None => unreachable!(),
Some(ref item) => {
let err = set_nonblock(item);
match err {
Ok(()) => {},
Err(e) => panic!(e),
}
raw_stdin_fd = item.as_raw_fd();
},
}
let raw_stdout_fd;
match process.stdout {
None => unreachable!(),
Some(ref item) => {
let err = set_nonblock(item);
match err {
Ok(()) => {},
Err(e) => panic!(e),
}
raw_stdout_fd = item.as_raw_fd();},
}
//println!("listen for connections {:?} {:?}", , process.stdout.unwrap().as_raw_fd());
let mut subprocess = SubprocessClient::new(PipeWriter::from(Io::from_raw_fd(raw_stdin_fd)),
PipeReader::from(Io::from_raw_fd(raw_stdout_fd)),
&TEST_DATA[..]);
let stdout_token : Token = Token(0);
let stdin_token : Token = Token(1);
event_loop.register(&subprocess.stdout, stdout_token, EventSet::readable(),
PollOpt::level()).unwrap();
// Connect to the server
event_loop.register(&subprocess.stdin, stdin_token, EventSet::writable(),
PollOpt::level()).unwrap();
// Start the event loop
event_loop.run(&mut subprocess).unwrap();
let res = process.wait_with_output();
match res {
Err(e) => {panic!(e);},
Ok(output) => {
subprocess.output.extend(&output.stdout);
println!("Final output was {:}\n", output.stdout.len());
},
}
println!("{:?}\n", subprocess.output.len());
}
fn main() {
echo_server();
}
Basically the only way to close stdin was to call process.wait_with_output since the Stdin has no close primitive
Once this happened, the remaining input could extend the output data vector.
There's now a crate that does this
https://crates.io/crates/subprocess-communicate