Created
September 24, 2022 13:41
-
-
Save jakobrs/d48d51f823c155073ce1c95e270f97d3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type StreamFrame<S> = (S, Option<<S as Stream>::Item>); | |
type StreamsUnorderedPollingFuture<S: Stream + Unpin> = impl Future<Output = StreamFrame<S>>; | |
pub struct StreamsUnordered<S: Stream + Unpin> { | |
streams: FuturesUnordered<StreamsUnorderedPollingFuture<S>>, | |
} | |
fn create_streams_unordered_polling_future<S: Stream + Unpin>( | |
mut stream: S, | |
) -> StreamsUnorderedPollingFuture<S> { | |
async move { | |
let next = stream.next().await; | |
(stream, next) | |
} | |
} | |
impl<S> StreamsUnordered<S> | |
where | |
S: Stream + Unpin + 'static, | |
{ | |
pub fn new() -> Self { | |
Self { | |
streams: FuturesUnordered::new(), | |
} | |
} | |
pub fn push(&mut self, stream: S) { | |
self.streams | |
.push(create_streams_unordered_polling_future(stream)); | |
} | |
pub fn streams( | |
self: Pin<&mut Self>, | |
) -> Pin<&mut FuturesUnordered<StreamsUnorderedPollingFuture<S>>> { | |
unsafe { self.map_unchecked_mut(|s| &mut s.streams) } | |
} | |
} | |
impl<S> Default for StreamsUnordered<S> | |
where | |
S: Stream + Unpin + 'static, | |
{ | |
fn default() -> Self { | |
Self::new() | |
} | |
} | |
impl<S> Stream for StreamsUnordered<S> | |
where | |
S: Stream + Unpin + 'static, | |
{ | |
type Item = S::Item; | |
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { | |
loop { | |
match self.as_mut().streams().poll_next(cx) { | |
Poll::Pending => return Poll::Pending, | |
Poll::Ready(Some((_stream, None))) => (), | |
Poll::Ready(Some((stream, Some(item)))) => { | |
self.streams() | |
.push(create_streams_unordered_polling_future(stream)); | |
return Poll::Ready(Some(item)); | |
} | |
Poll::Ready(None) => return Poll::Ready(None), | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment