Skip to content

Instantly share code, notes, and snippets.

@jmg-duarte
Created October 3, 2024 20:47
Show Gist options
  • Save jmg-duarte/f606410a5e0314d7b5cee959a240b2d8 to your computer and use it in GitHub Desktop.
Save jmg-duarte/f606410a5e0314d7b5cee959a240b2d8 to your computer and use it in GitHub Desktop.
pin_project! {
pub struct BlockStream<R> {
#[pin]
reader: R,
buf: BytesMut,
}
}
impl<R: AsyncRead> Stream for BlockStream<R> {
type Item = io::Result<BytesMut>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if this.buf.capacity() == 0 {
return Poll::Ready(None);
}
loop {
let mut buf = ReadBuf::uninit(this.buf.spare_capacity_mut());
task::ready!(this.reader.as_mut().poll_read(cx, &mut buf))?;
let filled = buf.filled().len();
unsafe { this.buf.set_len(this.buf.len() + filled) };
if this.buf.is_empty() {
break Poll::Ready(None);
} else if filled == 0 || this.buf.len() == this.buf.capacity() {
let new_capacity = if filled == 0 { 0 } else { this.buf.capacity() };
let block = mem::replace(this.buf, BytesMut::with_capacity(new_capacity));
break Poll::Ready(Some(Ok(block)));
}
}
}
}
use bytes::BytesMut;
use futures_core::Stream;
use pin_project_lite::pin_project;
use std::io;
use std::mem;
use std::pin::Pin;
use std::task;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::ReadBuf;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment