I want to implement a futures::Stream
for reading and parsing the standard output of a child subprocess.
What I'm doing at the moment:
spawn subprocess and obtain its stdout via std::process
methods: let child = Command::new(...).stdout(Stdio.pipe()).spawn().expect(...)
add AsyncRead
and BufRead
to stdout:
let stdout = BufReader::new(tokio_io::io::AllowStdIo::new(
child.stdout.expect("Failed to open stdout"),
));
declare a wrapper struct for stdout:
struct MyStream<Io: AsyncRead + BufRead> {
io: Io,
}
implement Stream
:
impl<Io: AsyncRead + BufRead> Stream for MyStream<Io> {
type Item = Message;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Message>, Error> {
let mut line = String::new();
let n = try_nb!(self.io.read_line(&mut line));
if n == 0 {
return Ok(None.into());
}
//...read & parse further
}
}
The problem is that AllowStdIo
doesn't make ChildStdout
magically asynchronous and the self.io.read_line
call still blocks.
I guess I need to pass something different instead of Stdio::pipe()
to have it asynchronous, but what? Or is there a different solution for that?
This question is different from What is the best approach to encapsulate blocking I/O in future-rs? because I want to get asynchronous I/O for the specific case of a subprocess and not solve the problem of encapsulation of synchronous I/O.
Update: I'm using tokio = "0.1.3"
to leverage its runtime feature and using tokio-process
is not an option at the moment (https://github.com/alexcrichton/tokio-process/issues/27)
Here is my version using tokio::process
let mut child = match Command::new(&args.run[0])
.args(parameters)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
{
Ok(c) => c,
Err(e) => panic!("Unable to start process `{}`. {}", args.run[0], e),
};
let stdout = child.stdout.take().expect("child did not have a handle to stdout");
let stderr = child.stderr.take().expect("child did not have a handle to stderr");
let mut stdout_reader = BufReader::new(stdout).lines();
let mut stderr_reader = BufReader::new(stderr).lines();
loop {
tokio::select! {
result = stdout_reader.next_line() => {
match result {
Ok(Some(line)) => println!("Stdout: {}", line),
Err(_) => break,
_ => (),
}
}
result = stderr_reader.next_line() => {
match result {
Ok(Some(line)) => println!("Stderr: {}", line),
Err(_) => break,
_ => (),
}
}
result = child.wait() => {
match result {
Ok(exit_code) => println!("Child process exited with {}", exit_code),
_ => (),
}
break // child process exited
}
};
}