Skip to content

Instantly share code, notes, and snippets.

@mvirkkunen
Last active March 21, 2020 19:46
Show Gist options
  • Save mvirkkunen/abeb9ea12edace7db25e80c36c03d3a3 to your computer and use it in GitHub Desktop.
Save mvirkkunen/abeb9ea12edace7db25e80c36c03d3a3 to your computer and use it in GitHub Desktop.
mod stream_body {
use std::marker::Unpin;
use std::pin::Pin;
use std::task::*;
use bytes::*;
use futures_core::{Stream, ready};
use http_body::{Body, SizeHint};
use warp::hyper::HeaderMap;
pub struct StreamBody<S> {
stream: S,
remaining: Option<u64>,
}
pub fn create<S, B>(header: &warp::hyper::HeaderMap, stream: S) -> StreamBody<S>
where
S: Stream<Item = Result<B, warp::Error>> + Unpin,
B: Buf
{
StreamBody {
stream,
remaining: header
.get(warp::hyper::http::header::CONTENT_LENGTH)
.and_then(|l| l.to_str().ok())
.map(|l| l.parse().ok())
.unwrap_or(None),
}
}
impl<S, B> Body for StreamBody<S>
where
S: Stream<Item = Result<B, warp::Error>> + Unpin,
B: Buf,
{
type Data = B;
type Error = warp::Error;
fn poll_data(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Result<Self::Data, Self::Error>>>
{
let item = ready!(Pin::new(&mut self.as_mut().stream).poll_next(cx));
if let Some(Ok(ref buf)) = item {
self.as_mut()
.remaining
.as_mut()
.map(|r| *r = r.saturating_sub(buf.remaining() as u64));
}
Poll::Ready(item)
}
fn poll_trailers(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Result<Option<HeaderMap>, Self::Error>>
{
// Well I sure hope there aren't any because warp hides them from us
Poll::Ready(Ok(None))
}
fn size_hint(&self) -> SizeHint {
self.remaining
.map(SizeHint::with_exact)
.unwrap_or_default()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment