Skip to content

Instantly share code, notes, and snippets.

@matthiasbeyer
Created October 15, 2020 09:29
Show Gist options
  • Save matthiasbeyer/6b5f3a79f75a68c2bdef5536bcd8f57d to your computer and use it in GitHub Desktop.
Save matthiasbeyer/6b5f3a79f75a68c2bdef5536bcd8f57d to your computer and use it in GitHub Desktop.
Stream of buffers to stream of lines
// should be all code...
use std::pin::Pin;
use std::result::Result as RResult;
use futures::Stream;
use futures::StreamExt;
use futures::task::Context;
use futures::task::Poll;
use anyhow::Error;
use anyhow::Result;
use tokio::io::BufReader;
use tokio::io::StreamReader;
use tokio::io::stream_reader;
use tokio::io::AsyncBufRead;
use shiplift::tty::TtyChunk;
mod util;
use crate::util::*;
type IoResult<T> = RResult<T, tokio::io::Error>;
pub struct LogParser<S>
where S: Stream<Item = IoResult<TtyChunkBuf>>
{
stream: BufReader<StreamReader<S, TtyChunkBuf>>
}
impl<S> LogParser<S>
where S: Stream<Item = IoResult<TtyChunkBuf>>
{
pub fn new<Src>(stream: Src) -> Result<LogParser<S>>
where Src: Stream<Item = shiplift::Result<TtyChunk>>
{
let stream = stream.map(|item| {
let f: IoResult<TtyChunkBuf> = item
.map(TtyChunkBuf::from)
.map_err(|e| tokio::io::Error::new(tokio::io::ErrorKind::Other, e));
f
});
let stream = stream_reader(stream);
let stream = BufReader::new(stream);
Ok(LogParser { stream })
}
}
impl<S> Stream for LogParser<S>
where S: Stream<Item = IoResult<TtyChunkBuf>>
{
type Item = Result<LogItem>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buf = String::new();
let b = self.stream
.read_line(&mut buf)
.await?;
if b == 0 {
Poll::Ready(None)
} else {
Poll::Ready(Some(LogItem::Line(buf))) // TODO: Parse
}
}
}
use std::collections::VecDeque;
use bytes::buf::Buf;
use shiplift::tty::TtyChunk;
pub enum TtyChunkBuf {
StdIn(VecDeque<u8>),
StdOut(VecDeque<u8>),
StdErr(VecDeque<u8>),
}
impl From<TtyChunk> for TtyChunkBuf {
fn from(c: TtyChunk) -> Self {
match c {
TtyChunk::StdIn(buffer) => TtyChunkBuf::StdIn(VecDeque::from(buffer)),
TtyChunk::StdOut(buffer) => TtyChunkBuf::StdOut(VecDeque::from(buffer)),
TtyChunk::StdErr(buffer) => TtyChunkBuf::StdErr(VecDeque::from(buffer)),
}
}
}
impl Buf for TtyChunkBuf {
fn remaining(&self) -> usize {
match self {
TtyChunkBuf::StdIn(buf) => buf.remaining(),
TtyChunkBuf::StdErr(buf) => buf.remaining(),
TtyChunkBuf::StdOut(buf) => buf.remaining(),
}
}
fn bytes<'a>(&'a self) -> &'a [u8] {
match self {
TtyChunkBuf::StdIn(buf) => buf.bytes(),
TtyChunkBuf::StdErr(buf) => buf.bytes(),
TtyChunkBuf::StdOut(buf) => buf.bytes(),
}
}
fn advance(&mut self, cnt: usize) {
match self {
TtyChunkBuf::StdIn(buf) => buf.advance(cnt),
TtyChunkBuf::StdErr(buf) => buf.advance(cnt),
TtyChunkBuf::StdOut(buf) => buf.advance(cnt),
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment