Skip to content

Instantly share code, notes, and snippets.

@habnabit
Created May 16, 2016 03:12
Show Gist options
  • Save habnabit/7519469d82928200013180f64e8b6a6f to your computer and use it in GitHub Desktop.
Save habnabit/7519469d82928200013180f64e8b6a6f to your computer and use it in GitHub Desktop.
use std::collections::VecDeque;
use std::sync::mpsc;
pub struct BipBufferReader<'a> {
incoming: mpsc::Receiver<&'a mut [u8]>,
ready: VecDeque<&'a mut [u8]>,
outgoing: mpsc::Sender<&'a mut [u8]>,
}
impl<'a> BipBufferReader<'a> {
pub fn read_buffer(&'a mut self, size: usize) -> Option<UncommittedSlice<'a>> {
while let Some(recvd) = self.incoming.try_recv().ok() {
self.ready.push_back(recvd);
}
self.ready.pop_front()
.map(move |slice| {
let slice = if slice.len() == size {
slice
} else {
let (slice, rest) = slice.split_at_mut(size);
self.ready.push_front(rest);
slice
};
UncommittedSlice { slice: slice, done: &self.outgoing }
})
}
}
pub struct BipBufferWriter<'a> {
incoming: mpsc::Receiver<&'a mut [u8]>,
ready: Vec<&'a mut [u8]>,
outgoing: mpsc::Sender<&'a mut [u8]>,
}
impl<'a> BipBufferWriter<'a> {
pub fn write_buffer(&'a mut self, size: usize) -> Option<UncommittedSlice<'a>> {
while let Some(recvd) = self.incoming.try_recv().ok() {
self.ready.push(recvd);
}
self.ready.sort();
self.ready.iter()
.position(|s| s.len() >= size)
.map(move |i| {
let slice = self.ready.swap_remove(i);
let slice = if slice.len() == size {
slice
} else {
let (slice, rest) = slice.split_at_mut(size);
self.ready.push(rest);
slice
};
UncommittedSlice { slice: slice, done: &self.outgoing }
})
}
}
pub struct UncommittedSlice<'a> {
slice: &'a mut [u8],
done: &'a mpsc::Sender<&'a mut [u8]>,
}
impl<'a> UncommittedSlice<'a> {
pub fn commit(self, size: usize) -> UncommittedSlice<'a> {
let (done, rest) = self.slice.split_at_mut(size);
let _ = self.done.send(done);
UncommittedSlice { slice: rest, done: self.done }
}
}
pub fn new<'a>() -> (BipBufferReader<'a>, BipBufferWriter<'a>) {
let (reader_outgoing, writer_incoming) = mpsc::channel();
let (writer_outgoing, reader_incoming) = mpsc::channel();
let reader = BipBufferReader {
incoming: reader_incoming,
ready: VecDeque::new(),
outgoing: reader_outgoing,
};
let writer = BipBufferWriter {
incoming: writer_incoming,
ready: vec![],
outgoing: writer_outgoing,
};
(reader, writer)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment