Created
January 31, 2021 21:00
-
-
Save danii/fdf11d9a986b7633c9d4545b68ec58d2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{ | |
collections::VecDeque, | |
future::Future, | |
io::{Error as IOError, ErrorKind as IOErrorKind, Result as IOResult}, | |
ops::Try, | |
pin::Pin, | |
task::{Context, Poll} | |
}; | |
use tokio::{fs::File, io::{AsyncRead, AsyncWrite, ReadBuf}, process::Child}; | |
pub struct CommandDriver<R> | |
where R: AsyncRead + Unpin { | |
input: Option<R>, | |
buffer: VecDeque<u8>, | |
process: Child | |
} | |
impl<R> CommandDriver<R> | |
where R: AsyncRead + Unpin { | |
pub fn new(input: R, process: Child) -> Self { | |
Self { | |
input: Some(input), | |
buffer: VecDeque::new(), | |
process: process | |
} | |
} | |
} | |
impl<R> AsyncRead for CommandDriver<R> | |
where R: AsyncRead + Unpin { | |
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buffer: &mut ReadBuf) | |
-> Poll<IOResult<()>> { | |
let Self { | |
ref mut input, | |
buffer: ref mut internal_buf, | |
ref mut process | |
} = &mut *self; | |
match process.try_wait() { | |
Ok(Some(exit)) if !exit.success() => return Poll::Ready( | |
Err(IOError::new(IOErrorKind::BrokenPipe, "Non 0 exit code."))), | |
Err(error) => return Poll::Ready(Err(error)), | |
_ => () | |
} | |
let Child { | |
stdin: ref mut standard_in, | |
stdout: ref mut standard_out, | |
.. | |
} = process; | |
let standard_out = standard_out.as_mut() | |
.expect("Expected process' stdout handle."); | |
match Pin::new(standard_out).poll_read(cx, buffer) { | |
// `standard_out` has some data. | |
Poll::Ready(result) => Poll::Ready(result), | |
// `standard_out` doesn't have data. | |
Poll::Pending => { | |
// Read from `input` in the meantime, if our internal buffer is empty. | |
if let Some(input) = input { | |
if internal_buf.len() == 0 { | |
let mut data = [0; 2048]; | |
let mut buf = ReadBuf::new(&mut data); | |
match Pin::new(input).poll_read(cx, &mut buf) { | |
// `input` has an error, so we bubble it. | |
Poll::Ready(Err(error)) => return Poll::Ready(Err(error)), | |
// `input` doesn't have any data. | |
Poll::Pending => return Poll::Pending, | |
// `input` has data. | |
Poll::Ready(Ok(())) => () | |
} | |
// Drop the temporary ReadBuf, so we may extend our buffer. | |
let buf = buf.filled().len(); | |
internal_buf.extend(data[..buf].iter()); | |
} | |
} | |
// Write to `standard_in` in the meantime, if our internal buffer isn't | |
// empty. | |
if let Some(standard_inn) = standard_in { | |
if internal_buf.len() != 0 { | |
let buf = internal_buf.make_contiguous(); | |
match Pin::new(standard_inn).poll_write(cx, buf) { | |
// `standard_in` has an error, so we bubble it. | |
Poll::Ready(Err(error)) => Poll::Ready(Err(error)), | |
// `standard_in` has accepted data. | |
Poll::Ready(Ok(count)) => { | |
internal_buf.drain(..count); | |
// Re poll. | |
self.poll_read(cx, buffer) | |
}, | |
// `standard_in` doesn't have any data. | |
Poll::Pending => Poll::Pending | |
} | |
} else { | |
// `input` has been exhausted. | |
*input = None; | |
*standard_in = None; // Deinitializes the standard_in handle. | |
// For whatever reason, shutdown doesn't do that. | |
// We're still waiting... | |
Poll::Pending | |
} | |
} else { | |
// We're still waiting... | |
Poll::Pending | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment