Last active
July 9, 2024 18:09
-
-
Save TimLuq/83a35453405f4c6e0f63fb2a0caa9f6e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use core::future::Future; | |
use core::pin::Pin; | |
use core::task::Poll; | |
use futures_core::Stream; | |
/// Item to return from stream. | |
pub struct StreamItem { | |
pub append_length: usize, | |
} | |
/// State shared between each sequential item computation. | |
#[derive(Default)] | |
struct StreamItemState { | |
index: u8, | |
buffer: Vec<u8>, | |
} | |
/// The struct to keep state and implement `Stream`. | |
pub struct TestStream { | |
aggregation_state: Option<Vec<u8>>, | |
item_state: Option<Pin<Box<dyn Future<Output = (StreamItemState, StreamItem)>>>>, | |
} | |
impl TestStream { | |
/// Computes a single item from a state. Returns the modified state and the item after completion. | |
async fn compute_item(mut state: StreamItemState) -> (StreamItemState, StreamItem) { | |
let i = state.index; | |
let data: Vec<u8> = vec![i]; // = fs::read(&format("file-{}.txt", i)).await.unwrap(); | |
state.buffer.extend_from_slice(&data); | |
state.index += 1; | |
(state, StreamItem { | |
append_length: data.len() | |
}) | |
} | |
/// Create a new instance of this amazing stream. | |
pub fn new() -> Self { | |
let init_state: StreamItemState = Default::default(); | |
Self { | |
aggregation_state: None, | |
item_state: Some(Box::pin(TestStream::compute_item(init_state))) | |
} | |
} | |
/// After all items have been processed this will return the aggregated data. | |
pub fn aggregation(&self) -> Option<&Vec<u8>> { | |
self.aggregation_state.as_ref() | |
} | |
} | |
impl Stream for TestStream { | |
type Item = StreamItem; | |
fn poll_next( | |
mut self: core::pin::Pin<&mut Self>, | |
cx: &mut core::task::Context, | |
) -> core::task::Poll<Option<Self::Item>> { | |
let slf = std::ops::DerefMut::deref_mut(&mut self); | |
let (state, res) = if let Some(fut) = slf.item_state.as_mut() { | |
match Future::poll(fut.as_mut(), cx) { | |
// return and keep waiting for result | |
Poll::Pending => return Poll::Pending, | |
// item computation complete | |
Poll::Ready((state, res)) => (state, res), | |
} | |
} else { | |
// no items left | |
return Poll::Ready(None) | |
}; | |
// prepare next state | |
slf.item_state = if state.index >= 16 { | |
slf.aggregation_state = Some(state.buffer); | |
None | |
} else { | |
Some(Box::pin(TestStream::compute_item(state))) | |
}; | |
// return item | |
Poll::Ready(Some(res)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment