Created
July 17, 2020 09:31
-
-
Save rklaehn/b0e963c2e7099b6c878adaba8e676ae0 to your computer and use it in GitHub Desktop.
stream life extension hack
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::prelude::*; | |
use std::{rc::Rc, task::{Context, Poll}, cell::RefCell, pin::Pin}; | |
fn foo<'a>(c: &'a mut u64) -> impl Stream<Item = u64> + 'a { | |
stream::unfold(10u64, move |max| { | |
future::ready(if *c < max { | |
*c += 1; | |
Some((*c, max)) | |
} else { | |
None | |
}) | |
}) | |
} | |
fn foo2(arg: u64) -> impl Stream<Item = u64> { | |
let snd: Rc<RefCell<Option<u64>>> = Rc::new(RefCell::new(None)); | |
let rcv = snd.clone(); | |
let mut fut = async move { | |
let mut arg = arg; | |
let mut stream = foo(&mut arg); | |
while let Some(item) = stream.next().await { | |
*snd.borrow_mut() = Some(item); | |
yield_now().await; | |
} | |
}.boxed_local(); | |
let mut done = false; | |
futures::stream::poll_fn(move |ctx| { | |
if done { | |
return Poll::Ready(None) | |
} | |
match fut.poll_unpin(ctx) { | |
Poll::Pending => { | |
if let Some(item) = rcv.borrow_mut().take() { | |
Poll::Ready(Some(item)) | |
} else { | |
Poll::Pending | |
} | |
} | |
Poll::Ready(_) => { | |
done = true; | |
if let Some(item) = rcv.borrow_mut().take() { | |
Poll::Ready(Some(item)) | |
} else { | |
Poll::Ready(None) | |
} | |
} | |
} | |
}) | |
} | |
async fn test_foo_foo2() { | |
let stream = foo2(5); | |
assert_eq!(stream.collect::<Vec<_>>().await, vec![6,7,8,9,10]); | |
} | |
pub async fn yield_now() { | |
YieldNow(false).await | |
} | |
struct YieldNow(bool); | |
impl Future for YieldNow { | |
type Output = (); | |
// The futures executor is implemented as a FIFO queue, so all this future | |
// does is re-schedule the future back to the end of the queue, giving room | |
// for other futures to progress. | |
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
if !self.0 { | |
self.0 = true; | |
cx.waker().wake_by_ref(); | |
Poll::Pending | |
} else { | |
Poll::Ready(()) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment