Created
October 3, 2024 20:47
-
-
Save jmg-duarte/f606410a5e0314d7b5cee959a240b2d8 to your computer and use it in GitHub Desktop.
Reader -> Stream<Bytes> by Sabrina Jewson (https://discord.com/channels/273534239310479360/1291460730985512961/1291477536194170983)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pin_project! { | |
pub struct BlockStream<R> { | |
#[pin] | |
reader: R, | |
buf: BytesMut, | |
} | |
} | |
impl<R: AsyncRead> Stream for BlockStream<R> { | |
type Item = io::Result<BytesMut>; | |
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { | |
let mut this = self.project(); | |
if this.buf.capacity() == 0 { | |
return Poll::Ready(None); | |
} | |
loop { | |
let mut buf = ReadBuf::uninit(this.buf.spare_capacity_mut()); | |
task::ready!(this.reader.as_mut().poll_read(cx, &mut buf))?; | |
let filled = buf.filled().len(); | |
unsafe { this.buf.set_len(this.buf.len() + filled) }; | |
if this.buf.is_empty() { | |
break Poll::Ready(None); | |
} else if filled == 0 || this.buf.len() == this.buf.capacity() { | |
let new_capacity = if filled == 0 { 0 } else { this.buf.capacity() }; | |
let block = mem::replace(this.buf, BytesMut::with_capacity(new_capacity)); | |
break Poll::Ready(Some(Ok(block))); | |
} | |
} | |
} | |
} | |
use bytes::BytesMut; | |
use futures_core::Stream; | |
use pin_project_lite::pin_project; | |
use std::io; | |
use std::mem; | |
use std::pin::Pin; | |
use std::task; | |
use std::task::Poll; | |
use tokio::io::AsyncRead; | |
use tokio::io::ReadBuf; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment