Created
January 20, 2017 00:03
-
-
Save jsgf/acb074cfbd7aa19a9927ed7038874177 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::{Async, AsyncSink, Future, Poll}; | |
use futures::stream::{Fuse, Stream}; | |
use futures::sink::Sink; | |
/// Fork a Stream into two | |
/// | |
/// Returns a Future for a process that consumes items from a Stream and | |
/// forwards them to two sinks depending on a predicate. If the predicate | |
/// returns false, send the value to out1, otherwise out2. | |
pub fn streamfork<In, Out1, Out2, F, E>(inp: In, out1: Out1, out2: Out2, pred: F) | |
-> Forker<In, Out1, Out2, F> | |
where In: Stream, | |
Out1: Sink, | |
Out2: Sink, | |
F: FnMut(&In::Item) -> Result<bool, E> { | |
Forker { | |
inp: Some(inp.fuse()), | |
out1: Out::new(out1), | |
out2: Out::new(out2), | |
pred: pred, | |
} | |
} | |
pub struct Forker<In, Out1, Out2, F> | |
where In: Stream, | |
Out1: Sink, | |
Out2: Sink | |
{ | |
inp: Option<Fuse<In>>, | |
out1: Out<Out1>, | |
out2: Out<Out2>, | |
pred: F, | |
} | |
struct Out<O> where O: Sink { | |
out: Option<O>, | |
buf: Option<O::SinkItem>, | |
} | |
impl<S: Sink> Out<S> { | |
fn new(s: S) -> Self { | |
Out { out: Some(s), buf: None } | |
} | |
fn out_mut(&mut self) -> &mut S { | |
self.out.as_mut().take().expect("Out after completion") | |
} | |
fn take_result(&mut self) -> S { | |
self.out.take().expect("Out missing") | |
} | |
fn try_start_send(&mut self, item: S::SinkItem) -> Poll<(), S::SinkError> { | |
debug_assert!(self.buf.is_none()); | |
if let AsyncSink::NotReady(item) = try!(self.out_mut().start_send(item)) { | |
self.buf = Some(item); | |
return Ok(Async::NotReady) | |
} | |
Ok(Async::Ready(())) | |
} | |
fn push(&mut self) -> Option<Poll<(), S::SinkError>> { | |
if let Some(item) = self.buf.take() { | |
Some(self.try_start_send(item)) | |
} else { | |
None | |
} | |
} | |
fn poll_complete(&mut self) -> Poll<(), S::SinkError> { | |
self.out_mut().poll_complete() | |
} | |
} | |
impl<In, Out1, Out2, F> Forker<In, Out1, Out2, F> | |
where In: Stream, | |
Out1: Sink, | |
Out2: Sink | |
{ | |
fn inp_mut(&mut self) -> &mut Fuse<In> { | |
self.inp.as_mut().take().expect("Input after completion") | |
} | |
fn take_result(&mut self) -> (In, Out1, Out2) { | |
let inp = self.inp.take().expect("Input missing in result"); | |
let out1 = self.out1.take_result(); | |
let out2 = self.out2.take_result(); | |
(inp.into_inner(), out1, out2) | |
} | |
} | |
#[derive(Debug)] | |
pub enum ForkError<Einp, Epred, Eout1, Eout2> { | |
Input(Einp), | |
Predicate(Epred), | |
Out1(Eout1), | |
Out2(Eout2), | |
} | |
impl<Einp, Epred, Eout1, Eout2> ForkError<Einp, Epred, Eout1, Eout2> { | |
fn input(err: Einp) -> Self { ForkError::Input(err) } | |
fn pred(err: Epred) -> Self { ForkError::Predicate(err) } | |
fn out1(err: Eout1) -> Self { ForkError::Out1(err) } | |
fn out2(err: Eout2) -> Self { ForkError::Out2(err) } | |
} | |
impl<In, Out1, Out2, F, Epred> Future for Forker<In, Out1, Out2, F> | |
where In: Stream, | |
Out1: Sink<SinkItem=In::Item>, | |
Out2: Sink<SinkItem=In::Item>, | |
F: FnMut(&In::Item) -> Result<bool, Epred> | |
{ | |
type Item = (In, Out1, Out2); | |
type Error = ForkError<In::Error, Epred, Out1::SinkError, Out2::SinkError>; | |
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | |
match self.out1.push() { | |
Some(Err(e)) => return Err(ForkError::out1(e)), | |
Some(Ok(Async::NotReady)) => return Ok(Async::NotReady), | |
_ => (), | |
} | |
match self.out2.push() { | |
Some(Err(e)) => return Err(ForkError::out2(e)), | |
Some(Ok(Async::NotReady)) => return Ok(Async::NotReady), | |
_ => (), | |
} | |
loop { | |
match try!(self.inp_mut().poll().map_err(ForkError::input)) { | |
Async::Ready(Some(item)) => { | |
match (self.pred)(&item) { | |
Ok(false) => try_ready!(self.out1.try_start_send(item).map_err(ForkError::out1)), | |
Ok(true) => try_ready!(self.out2.try_start_send(item).map_err(ForkError::out2)), | |
Err(e) => return Err(ForkError::pred(e)), | |
} | |
} | |
Async::Ready(None) => { | |
try_ready!(self.out1.poll_complete().map_err(ForkError::out1)); | |
try_ready!(self.out2.poll_complete().map_err(ForkError::out2)); | |
return Ok(Async::Ready(self.take_result())); | |
} | |
Async::NotReady => { | |
try_ready!(self.out1.poll_complete().map_err(ForkError::out1)); | |
try_ready!(self.out2.poll_complete().map_err(ForkError::out2)); | |
return Ok(Async::NotReady); | |
} | |
} | |
} | |
} | |
} | |
#[cfg(test)] | |
mod test { | |
use super::*; | |
use futures::Future; | |
use futures::stream::iter; | |
#[test] | |
fn simple() { | |
let even = Vec::new(); | |
let odd = Vec::new(); | |
let nums = iter((0i32..10).into_iter().map(Ok::<_,()>)); | |
let (_, even, odd) = streamfork(nums, even, odd, |n| Ok::<_,()>(*n % 2 == 1)).wait().unwrap(); | |
println!("even={:?}", even); | |
println!("odd={:?}", odd); | |
assert_eq!(even, vec![0,2,4,6,8]); | |
assert_eq!(odd, vec![1,3,5,7,9]); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment