Last active
December 2, 2021 15:25
-
-
Save conradludgate/f2dc931b4eb663570372d1aad06af4ec to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(generators, generator_trait)] | |
use std::ops::Generator; | |
use std::pin::Pin; | |
use std::task::{Context, Poll}; | |
use std::time::Duration; | |
use futures_core::stream::Stream; | |
use futures_core::{FusedStream, Future}; | |
use futures_util::{pin_mut, StreamExt}; | |
use pin_project::pin_project; | |
fn zero_to_three() -> impl Stream<Item = u32> { | |
AsyncStream::new(|| { | |
for i in 0..3 { | |
let fut = async move { | |
tokio::time::sleep(Duration::from_secs(1)).await; | |
i | |
}; | |
yield fut; | |
} | |
}) | |
} | |
#[tokio::main] | |
async fn main() { | |
let s = zero_to_three(); | |
pin_mut!(s); | |
while let Some(value) = s.next().await { | |
println!("got {}", value); | |
} | |
} | |
#[derive(Debug)] | |
#[pin_project] | |
pub struct AsyncStream<G: Generator<Return = ()>> | |
where | |
G::Yield: Future, | |
{ | |
#[pin] | |
fut: Option<G::Yield>, | |
#[pin] | |
generator: Option<G>, | |
} | |
impl<G: Generator<Return = ()>> AsyncStream<G> | |
where | |
G::Yield: Future, | |
{ | |
pub fn new(generator: G) -> Self { | |
Self { | |
fut: None, | |
generator: Some(generator), | |
} | |
} | |
} | |
impl<G> FusedStream for AsyncStream<G> | |
where | |
G: Generator<Return = ()>, | |
G::Yield: Future, | |
{ | |
fn is_terminated(&self) -> bool { | |
self.generator.is_none() | |
} | |
} | |
impl<G> Stream for AsyncStream<G> | |
where | |
G: Generator<Return = ()>, | |
G::Yield: Future, | |
{ | |
type Item = <G::Yield as Future>::Output; | |
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
let mut this = self.project(); | |
// poll future straight away | |
if let Some(p) = Self::poll_fut(&mut this.fut, cx) { | |
// return result of poll | |
return Self::map_fut_poll(p, &mut this.generator, &mut this.fut); | |
} | |
// poll generator for next future | |
if Self::resume_gen(&mut this.generator, &mut this.fut) { | |
let poll = Self::poll_fut(&mut this.fut, cx).unwrap(); | |
Self::map_fut_poll(poll, &mut this.generator, &mut this.fut) | |
} else { | |
Poll::Ready(None) | |
} | |
} | |
} | |
impl<G> AsyncStream<G> | |
where | |
G: Generator<Return = ()>, | |
G::Yield: Future, | |
{ | |
/// Maps the poll from the future into a poll for the stream | |
/// Also eagerly prepares the next future for polling | |
fn map_fut_poll( | |
poll: Poll<<G::Yield as Future>::Output>, | |
opt_gen: &mut Pin<&mut Option<G>>, | |
opt_fut: &mut Pin<&mut Option<G::Yield>>, | |
) -> Poll<Option<<G::Yield as Future>::Output>> { | |
match poll { | |
Poll::Ready(r) => { | |
Self::resume_gen(opt_gen, opt_fut); | |
Poll::Ready(Some(r)) | |
} | |
Poll::Pending => Poll::Pending, | |
} | |
} | |
/// resumes the generator to get the next future | |
/// if generator is complete, it's dropped. | |
/// Returns whether a new future is available to poll | |
fn resume_gen( | |
opt_gen: &mut Pin<&mut Option<G>>, | |
opt_fut: &mut Pin<&mut Option<G::Yield>>, | |
) -> bool { | |
match opt_gen.as_mut().as_pin_mut() { | |
Some(gen) => match gen.resume(()) { | |
std::ops::GeneratorState::Yielded(i) => { | |
opt_fut.set(Some(i)); | |
true | |
} | |
std::ops::GeneratorState::Complete(_) => { | |
opt_gen.set(None); | |
false | |
} | |
}, | |
None => false, | |
} | |
} | |
/// polls the future. If the response is ready, it drops the future. | |
fn poll_fut( | |
opt_fut: &mut Pin<&mut Option<G::Yield>>, | |
cx: &mut Context<'_>, | |
) -> Option<Poll<<G::Yield as Future>::Output>> { | |
if let Some(fut) = opt_fut.as_mut().as_pin_mut() { | |
match fut.poll(cx) { | |
Poll::Ready(r) => { | |
opt_fut.set(None); | |
Some(Poll::Ready(r)) | |
} | |
Poll::Pending => Some(Poll::Pending), | |
} | |
} else { | |
None | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment