Created
October 15, 2020 09:29
-
-
Save matthiasbeyer/6b5f3a79f75a68c2bdef5536bcd8f57d to your computer and use it in GitHub Desktop.
Stream of buffers to stream of lines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// should be all code... | |
use std::pin::Pin; | |
use std::result::Result as RResult; | |
use futures::Stream; | |
use futures::StreamExt; | |
use futures::task::Context; | |
use futures::task::Poll; | |
use anyhow::Error; | |
use anyhow::Result; | |
use tokio::io::BufReader; | |
use tokio::io::StreamReader; | |
use tokio::io::stream_reader; | |
use tokio::io::AsyncBufRead; | |
use shiplift::tty::TtyChunk; | |
mod util; | |
use crate::util::*; | |
type IoResult<T> = RResult<T, tokio::io::Error>; | |
pub struct LogParser<S> | |
where S: Stream<Item = IoResult<TtyChunkBuf>> | |
{ | |
stream: BufReader<StreamReader<S, TtyChunkBuf>> | |
} | |
impl<S> LogParser<S> | |
where S: Stream<Item = IoResult<TtyChunkBuf>> | |
{ | |
pub fn new<Src>(stream: Src) -> Result<LogParser<S>> | |
where Src: Stream<Item = shiplift::Result<TtyChunk>> | |
{ | |
let stream = stream.map(|item| { | |
let f: IoResult<TtyChunkBuf> = item | |
.map(TtyChunkBuf::from) | |
.map_err(|e| tokio::io::Error::new(tokio::io::ErrorKind::Other, e)); | |
f | |
}); | |
let stream = stream_reader(stream); | |
let stream = BufReader::new(stream); | |
Ok(LogParser { stream }) | |
} | |
} | |
impl<S> Stream for LogParser<S> | |
where S: Stream<Item = IoResult<TtyChunkBuf>> | |
{ | |
type Item = Result<LogItem>; | |
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
let mut buf = String::new(); | |
let b = self.stream | |
.read_line(&mut buf) | |
.await?; | |
if b == 0 { | |
Poll::Ready(None) | |
} else { | |
Poll::Ready(Some(LogItem::Line(buf))) // TODO: Parse | |
} | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::collections::VecDeque; | |
use bytes::buf::Buf; | |
use shiplift::tty::TtyChunk; | |
pub enum TtyChunkBuf { | |
StdIn(VecDeque<u8>), | |
StdOut(VecDeque<u8>), | |
StdErr(VecDeque<u8>), | |
} | |
impl From<TtyChunk> for TtyChunkBuf { | |
fn from(c: TtyChunk) -> Self { | |
match c { | |
TtyChunk::StdIn(buffer) => TtyChunkBuf::StdIn(VecDeque::from(buffer)), | |
TtyChunk::StdOut(buffer) => TtyChunkBuf::StdOut(VecDeque::from(buffer)), | |
TtyChunk::StdErr(buffer) => TtyChunkBuf::StdErr(VecDeque::from(buffer)), | |
} | |
} | |
} | |
impl Buf for TtyChunkBuf { | |
fn remaining(&self) -> usize { | |
match self { | |
TtyChunkBuf::StdIn(buf) => buf.remaining(), | |
TtyChunkBuf::StdErr(buf) => buf.remaining(), | |
TtyChunkBuf::StdOut(buf) => buf.remaining(), | |
} | |
} | |
fn bytes<'a>(&'a self) -> &'a [u8] { | |
match self { | |
TtyChunkBuf::StdIn(buf) => buf.bytes(), | |
TtyChunkBuf::StdErr(buf) => buf.bytes(), | |
TtyChunkBuf::StdOut(buf) => buf.bytes(), | |
} | |
} | |
fn advance(&mut self, cnt: usize) { | |
match self { | |
TtyChunkBuf::StdIn(buf) => buf.advance(cnt), | |
TtyChunkBuf::StdErr(buf) => buf.advance(cnt), | |
TtyChunkBuf::StdOut(buf) => buf.advance(cnt), | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment