Skip to content

Instantly share code, notes, and snippets.

@jakobrs
Created September 24, 2022 13:41
Show Gist options
  • Save jakobrs/d48d51f823c155073ce1c95e270f97d3 to your computer and use it in GitHub Desktop.
Save jakobrs/d48d51f823c155073ce1c95e270f97d3 to your computer and use it in GitHub Desktop.
type StreamFrame<S> = (S, Option<<S as Stream>::Item>);
type StreamsUnorderedPollingFuture<S: Stream + Unpin> = impl Future<Output = StreamFrame<S>>;
pub struct StreamsUnordered<S: Stream + Unpin> {
streams: FuturesUnordered<StreamsUnorderedPollingFuture<S>>,
}
fn create_streams_unordered_polling_future<S: Stream + Unpin>(
mut stream: S,
) -> StreamsUnorderedPollingFuture<S> {
async move {
let next = stream.next().await;
(stream, next)
}
}
impl<S> StreamsUnordered<S>
where
S: Stream + Unpin + 'static,
{
pub fn new() -> Self {
Self {
streams: FuturesUnordered::new(),
}
}
pub fn push(&mut self, stream: S) {
self.streams
.push(create_streams_unordered_polling_future(stream));
}
pub fn streams(
self: Pin<&mut Self>,
) -> Pin<&mut FuturesUnordered<StreamsUnorderedPollingFuture<S>>> {
unsafe { self.map_unchecked_mut(|s| &mut s.streams) }
}
}
impl<S> Default for StreamsUnordered<S>
where
S: Stream + Unpin + 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<S> Stream for StreamsUnordered<S>
where
S: Stream + Unpin + 'static,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.as_mut().streams().poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some((_stream, None))) => (),
Poll::Ready(Some((stream, Some(item)))) => {
self.streams()
.push(create_streams_unordered_polling_future(stream));
return Poll::Ready(Some(item));
}
Poll::Ready(None) => return Poll::Ready(None),
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment