Last active
January 22, 2019 20:48
-
-
Save dgllghr/2b93d28a124f65d26d5b500a21789151 to your computer and use it in GitHub Desktop.
Async stream reader using csv-core and tokio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use bytes::Buf; | |
use std::io; | |
use tokio::prelude::*; | |
pub struct ReadStream<B> { | |
max_record_size: usize, | |
inbuf: Vec<u8>, | |
inbuf_offset: usize, | |
outbuf: Vec<u8>, | |
outbuf_offset: usize, | |
endsbuf: Vec<usize>, | |
endsbuf_offset: usize, | |
reader: csv_core::Reader, | |
inner: stream::Fuse<Box<Stream<Item = B, Error = io::Error> + Send>>, | |
} | |
impl<B> ReadStream<B> | |
where | |
B: Buf, | |
{ | |
pub fn new( | |
stream: Box<Stream<Item = B, Error = io::Error> + Send>, | |
reader: csv_core::Reader, | |
max_record_size: usize, | |
) -> Self { | |
let outbuf = vec![0; 256]; | |
let endsbuf = vec![0; 16]; | |
ReadStream { | |
max_record_size, | |
inbuf: Vec::new(), | |
inbuf_offset: 0, | |
outbuf, | |
outbuf_offset: 0, | |
endsbuf, | |
endsbuf_offset: 0, | |
reader: reader, | |
inner: stream.fuse(), | |
} | |
} | |
fn read_in<T>(&mut self) -> Result<Option<Async<Option<T>>>, io::Error> { | |
match self.inner.poll()? { | |
Async::NotReady => { | |
return Ok(Some(Async::NotReady)); | |
} | |
Async::Ready(None) => { | |
// The input stream is exhausted but this stream is not | |
// complete until the csv_core::Reader is complete | |
return Ok(None); | |
} | |
Async::Ready(Some(buffer)) => { | |
if self.inbuf.capacity() == 0 { | |
self.inbuf.reserve_exact(buffer.remaining() * 4); | |
} | |
if buffer.remaining() + self.inbuf.len() > self.inbuf.capacity() { | |
self.truncate_inbuf(); | |
} | |
self.inbuf.extend_from_slice(buffer.bytes()); | |
return Ok(None); | |
} | |
} | |
} | |
#[inline] | |
fn inbuf_empty(&self) -> bool { | |
self.inbuf.is_empty() || self.inbuf.len() - self.inbuf_offset <= 1 | |
} | |
#[inline] | |
fn truncate_inbuf(&mut self) { | |
self.inbuf_offset = 0; | |
self.inbuf.truncate(0); | |
} | |
#[inline] | |
fn read_record(&mut self) -> csv_core::ReadRecordResult { | |
let inbuf = &self.inbuf[self.inbuf_offset..]; | |
let outbuf = &mut self.outbuf[self.outbuf_offset..]; | |
let endsbuf = &mut self.endsbuf[self.endsbuf_offset..]; | |
let (res, r, w, ne) = self.reader.read_record(inbuf, outbuf, endsbuf); | |
self.inbuf_offset = self.inbuf_offset + r; | |
self.outbuf_offset = self.outbuf_offset + w; | |
self.endsbuf_offset = self.endsbuf_offset + ne; | |
res | |
} | |
} | |
impl<B> Stream for ReadStream<B> | |
where | |
B: Buf, | |
{ | |
type Item = Record; | |
type Error = io::Error; | |
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { | |
loop { | |
if self.inbuf_empty() { | |
let read_res = self.read_in()?; | |
if let Some(res) = read_res { | |
return Ok(res); | |
} | |
} | |
let res = self.read_record(); | |
match res { | |
csv_core::ReadRecordResult::InputEmpty => { | |
let read_res = self.read_in()?; | |
if let Some(res) = read_res { | |
return Ok(res); | |
} | |
} | |
csv_core::ReadRecordResult::OutputFull => { | |
let new_size = std::cmp::min(self.max_record_size, self.outbuf.len() * 2); | |
if new_size > self.outbuf.len() { | |
self.outbuf.resize(new_size, 0); | |
continue; | |
} else { | |
return Err(io::Error::new( | |
io::ErrorKind::InvalidData, | |
"record too large", | |
)); | |
} | |
} | |
csv_core::ReadRecordResult::OutputEndsFull => { | |
self.endsbuf.resize(self.endsbuf.len() * 2, 0); | |
} | |
csv_core::ReadRecordResult::End => { | |
return Ok(Async::Ready(None)); | |
} | |
csv_core::ReadRecordResult::Record => { | |
let csv_rec = Record::new(&self.outbuf, &self.endsbuf, self.endsbuf_offset); | |
self.outbuf_offset = 0; | |
self.endsbuf_offset = 0; | |
return Ok(Async::Ready(Some(csv_rec))); | |
} | |
} | |
} | |
} | |
} | |
pub struct Record { | |
buf: Vec<u8>, | |
endsbuf: Vec<usize>, | |
} | |
impl Record { | |
pub fn new(buf: &[u8], endsbuf: &[usize], num_ends: usize) -> Self { | |
Record { | |
buf: buf.to_vec(), | |
endsbuf: (&endsbuf[..num_ends]).to_vec(), | |
} | |
} | |
pub fn iter<'a>(&'a self) -> RecordIter<'a> { | |
RecordIter { | |
rec: self, | |
offset: 0, | |
end_offset: 0, | |
} | |
} | |
} | |
pub struct RecordIter<'a> { | |
rec: &'a Record, | |
offset: usize, | |
end_offset: usize, | |
} | |
impl<'a> Iterator for RecordIter<'a> { | |
type Item = &'a [u8]; | |
fn next(&mut self) -> Option<Self::Item> { | |
if self.end_offset >= self.rec.endsbuf.len() { | |
return None; | |
} | |
let start = self.offset; | |
let end = self.rec.endsbuf[self.end_offset]; | |
self.offset = end; | |
self.end_offset = self.end_offset + 1; | |
Some(&self.rec.buf[start..end]) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment