Skip to content

Instantly share code, notes, and snippets.

@danii
Created January 31, 2021 21:00
Show Gist options
  • Save danii/fdf11d9a986b7633c9d4545b68ec58d2 to your computer and use it in GitHub Desktop.
Save danii/fdf11d9a986b7633c9d4545b68ec58d2 to your computer and use it in GitHub Desktop.
use std::{
collections::VecDeque,
future::Future,
io::{Error as IOError, ErrorKind as IOErrorKind, Result as IOResult},
ops::Try,
pin::Pin,
task::{Context, Poll}
};
use tokio::{fs::File, io::{AsyncRead, AsyncWrite, ReadBuf}, process::Child};
pub struct CommandDriver<R>
where R: AsyncRead + Unpin {
input: Option<R>,
buffer: VecDeque<u8>,
process: Child
}
impl<R> CommandDriver<R>
where R: AsyncRead + Unpin {
pub fn new(input: R, process: Child) -> Self {
Self {
input: Some(input),
buffer: VecDeque::new(),
process: process
}
}
}
impl<R> AsyncRead for CommandDriver<R>
where R: AsyncRead + Unpin {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buffer: &mut ReadBuf)
-> Poll<IOResult<()>> {
let Self {
ref mut input,
buffer: ref mut internal_buf,
ref mut process
} = &mut *self;
match process.try_wait() {
Ok(Some(exit)) if !exit.success() => return Poll::Ready(
Err(IOError::new(IOErrorKind::BrokenPipe, "Non 0 exit code."))),
Err(error) => return Poll::Ready(Err(error)),
_ => ()
}
let Child {
stdin: ref mut standard_in,
stdout: ref mut standard_out,
..
} = process;
let standard_out = standard_out.as_mut()
.expect("Expected process' stdout handle.");
match Pin::new(standard_out).poll_read(cx, buffer) {
// `standard_out` has some data.
Poll::Ready(result) => Poll::Ready(result),
// `standard_out` doesn't have data.
Poll::Pending => {
// Read from `input` in the meantime, if our internal buffer is empty.
if let Some(input) = input {
if internal_buf.len() == 0 {
let mut data = [0; 2048];
let mut buf = ReadBuf::new(&mut data);
match Pin::new(input).poll_read(cx, &mut buf) {
// `input` has an error, so we bubble it.
Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
// `input` doesn't have any data.
Poll::Pending => return Poll::Pending,
// `input` has data.
Poll::Ready(Ok(())) => ()
}
// Drop the temporary ReadBuf, so we may extend our buffer.
let buf = buf.filled().len();
internal_buf.extend(data[..buf].iter());
}
}
// Write to `standard_in` in the meantime, if our internal buffer isn't
// empty.
if let Some(standard_inn) = standard_in {
if internal_buf.len() != 0 {
let buf = internal_buf.make_contiguous();
match Pin::new(standard_inn).poll_write(cx, buf) {
// `standard_in` has an error, so we bubble it.
Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
// `standard_in` has accepted data.
Poll::Ready(Ok(count)) => {
internal_buf.drain(..count);
// Re poll.
self.poll_read(cx, buffer)
},
// `standard_in` doesn't have any data.
Poll::Pending => Poll::Pending
}
} else {
// `input` has been exhausted.
*input = None;
*standard_in = None; // Deinitializes the standard_in handle.
// For whatever reason, shutdown doesn't do that.
// We're still waiting...
Poll::Pending
}
} else {
// We're still waiting...
Poll::Pending
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment