Created
August 24, 2022 17:52
-
-
Save adamchalmers/de1dc12a7eb70501583ac5e26417587c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use axum::{ | |
body::Bytes, | |
extract::{self, multipart::MultipartError}, | |
Extension, | |
}; | |
use futures::Stream; | |
use reqwest::StatusCode; | |
async fn upload( | |
mp: extract::Multipart, | |
Extension(client): Extension<reqwest::Client>, | |
) -> Result<String, (StatusCode, String)> { | |
let streaming = MultipartStream { mp }.into_stream(); | |
let streaming_body = reqwest::Body::wrap_stream(streaming); | |
client | |
.put(url::Url::parse("https://example.com/file").unwrap()) | |
.body(streaming_body) | |
.send() | |
.await | |
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Oops".to_string())) | |
.map(|_| "Yay".to_string()) | |
} | |
/// Wrapper around Multipart that impls Stream in a 'static way. | |
struct MultipartStream { | |
mp: extract::Multipart, | |
} | |
impl MultipartStream { | |
/// Owns the Multipart, so it's 'static | |
fn into_stream(mut self) -> impl Stream<Item = Result<Bytes, MultipartError>> { | |
async_stream::stream! { | |
while let Some(field) = self.mp.next_field().await.unwrap() { | |
// `field` is Field<'a>, it references data owned by `self.mp`. | |
// It impls Stream, but it isn't 'static, so it cannot be used as a Reqwest body directly. | |
// Luckily, the return value of this function _is_ 'static. | |
for await value in field { | |
yield value; | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is in response to a question in the reqwest discord -- how do you write an Axum endpoint that accepts a multipart request body and streams it into an outgoing reqwest body?
It sounds simple, but it's actually pretty hard. Why? Well, the incoming request's multipart body contains several field. Each is
Field<'a>
where 'a is the lifetime of the parentMultipart
. OK, so fields reference data from the Multipart, and therefore cannot outlive the Multipart. Makes sense.But reqwest only lets you send a body stream which is
'static
(i.e. not borrowed, or borrowed for the entire time your program runs). Why? Because of reqwest's particular design (connections have their own task, so when you send a body, it gets moved into that connection's dedicated task -- it can't reference data from a different task T1, because what if T1 goes away before the connection task finishes sending?)So you can't use the Field stream as a reqwest body, because it's borrowed.
The solution is to make a new wrapper stream, which owns the Multipart and implements its own Stream. This stream iterates over each Multipart field and then yields from the per-field streams. Because the wrapper owns all the data, the resulting stream is 'static and can be used as a reqwest body :)