Skip to content

Instantly share code, notes, and snippets.

@robey
Created April 2, 2017 16:46
Show Gist options
  • Save robey/2ef986cf81735bc0ada2df712a6e770f to your computer and use it in GitHub Desktop.
Save robey/2ef986cf81735bc0ada2df712a6e770f to your computer and use it in GitHub Desktop.
use futures::{Async, Future, IntoFuture, Poll, Stream};
enum State<St, Fut> {
/// No active future is running. Ready to call the function an process the next future.
Ready(St),
/// Currently polling this future.
Working(Fut),
/// Stream ended!
Done(Option<St>)
}
/// Like `stream::unfold`, except the function always returns a future (so
/// the future may wait until it resolves to decide if the stream has ended),
/// and the final state can be extracted from the finished stream. (FIXME)
///
/// Given an initial state `state`, and a function `f`, this struct creates
/// a stream. Each time a new item is requested from the stream, it calls
/// `f`, which returns an `(Option<It>, St)`:
/// - `None`: the stream is over
/// - `Some(item)`: emit the item and call this function again with
/// the new state to generate the next item.
pub struct StreamGenerator<St, F, Fut>
where Fut: IntoFuture
{
f: F,
state: Option<State<St, Fut::Future>>
}
impl<St, F, Fut, It> StreamGenerator<St, F, Fut>
where F: FnMut(St) -> Fut,
Fut: IntoFuture<Item = (Option<It>, St)>
{
pub fn new(state: St, f: F) -> StreamGenerator<St, F, Fut> {
StreamGenerator {
f,
state: Some(State::Ready(state))
}
}
}
impl<St, F, Fut, It> Stream for StreamGenerator<St, F, Fut>
where
F: FnMut(St) -> Fut,
Fut: IntoFuture<Item = (Option<It>, St)>
{
type Item = It;
type Error = Fut::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
match self.state.take().expect("polling stream twice") {
State::Ready(state) => {
let fut = (self.f)(state).into_future();
self.state = Some(State::Working(fut));
},
State::Working(mut fut) => {
match fut.poll() {
Err(e) => {
self.state = Some(State::Done(None));
return Err(e);
},
Ok(Async::NotReady) => {
self.state = Some(State::Working(fut));
return Ok(Async::NotReady);
},
Ok(Async::Ready((None, state))) => {
self.state = Some(State::Done(Some(state)));
return Ok(Async::Ready(None));
},
Ok(Async::Ready((Some(item), state))) => {
self.state = Some(State::Ready(state));
return Ok(Async::Ready(Some(item)));
}
}
}
State::Done(_) => {
return Ok(Async::Ready(None))
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment