Last active
March 13, 2021 09:56
-
-
Save coord-e/7794bf029d872d17f315e4d90b670134 to your computer and use it in GitHub Desktop.
Upload to S3 (minio) via warp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::convert::TryInto; | |
use std::io; | |
use std::net::SocketAddr; | |
use std::sync::Arc; | |
use anyhow::{bail, Context}; | |
use bytes::{Buf, BufMut, BytesMut}; | |
use futures::stream::{self, Stream, TryStreamExt}; | |
use mime::Mime; | |
use mpart_async::server::MultipartStream; | |
use rusoto_s3::{S3Client, S3}; | |
use structopt::StructOpt; | |
use tracing::{event, Level}; | |
use warp::Filter; | |
#[derive(Debug, Clone, StructOpt)] | |
struct Opt { | |
#[structopt(long, env = "TEST_SERVER_BIND")] | |
bind: SocketAddr, | |
#[structopt(short, long, default_value = "test")] | |
bucket: String, | |
#[structopt(long, env = "TEST_MINIO_ENDPOINT")] | |
endpoint: String, | |
#[structopt(long, env = "TEST_MINIO_ACCESS_KEY")] | |
access_key: String, | |
#[structopt(long, env = "TEST_MINIO_ACCESS_SECRET")] | |
access_secret: String, | |
} | |
async fn collect_part_stream<E>( | |
mut stream: impl Stream<Item = Result<impl Buf, E>> + Unpin, | |
) -> Result<Vec<u8>, E> { | |
let mut result = BytesMut::new(); | |
while let Some(buf) = stream.try_next().await? { | |
result.put(buf); | |
} | |
Ok(result.to_vec()) | |
} | |
async fn stream_handler( | |
opt: Opt, | |
client: S3Client, | |
mime: Mime, | |
body: impl Stream<Item = Result<impl Buf, warp::Error>> + Send + Sync + Unpin + 'static, | |
) -> anyhow::Result<()> { | |
let boundary = mime | |
.get_param("boundary") | |
.context("no boundary")? | |
.to_string(); | |
let mut data = MultipartStream::new( | |
boundary, | |
body.map_ok(|mut buf| buf.copy_to_bytes(buf.remaining())), | |
); | |
let mut key = None; | |
let mut stream = None; | |
let mut length = None; | |
let mut content_type = None; | |
while let Ok(Some(part)) = data.try_next().await { | |
let name = part.name().context("no name")?; | |
event!(Level::INFO, %name, "part"); | |
match name { | |
"key" => { | |
let key_bytes = collect_part_stream(part).await?; | |
key = Some(String::from_utf8(key_bytes)?); | |
} | |
"content-length" => { | |
let length_bytes = collect_part_stream(part).await?; | |
length = Some(std::str::from_utf8(&length_bytes)?.parse()?); | |
} | |
"content" => { | |
content_type = Some(part.content_type().context("no content-type")?.to_owned()); | |
stream = Some(part); | |
} | |
name => bail!("unknown key {}", name), | |
} | |
} | |
let key = key.context("no key")?; | |
let length = length.context("no length")?; | |
let stream = stream | |
.context("no content")? | |
.map_err(|err| io::Error::new(io::ErrorKind::Other, err)); | |
event!(Level::INFO, %length, %key, "put object"); | |
let put_request = rusoto_s3::PutObjectRequest { | |
bucket: opt.bucket.clone(), | |
key, | |
body: Some(rusoto_core::ByteStream::new(stream)), | |
content_length: Some(length), | |
content_type, | |
..Default::default() | |
}; | |
client.put_object(put_request).await?; | |
Ok(()) | |
} | |
async fn collect_handler( | |
opt: Opt, | |
client: S3Client, | |
mut data: warp::multipart::FormData, | |
) -> anyhow::Result<()> { | |
let mut key = None; | |
let mut content = None; | |
let mut content_type = None; | |
while let Some(part) = data.try_next().await? { | |
match part.name() { | |
"key" => { | |
let key_bytes = collect_part_stream(part.stream()).await?; | |
key = Some(String::from_utf8(key_bytes)?); | |
} | |
"content" => { | |
content_type = Some(part.content_type().context("no content_type")?.to_owned()); | |
content = Some(collect_part_stream(part.stream()).await?); | |
} | |
name => bail!("unknown key {}", name), | |
} | |
} | |
let key = key.context("no key")?; | |
let content = content.context("no content")?; | |
let put_request = rusoto_s3::PutObjectRequest { | |
bucket: opt.bucket.clone(), | |
key, | |
body: Some(content.into()), | |
content_type, | |
..Default::default() | |
}; | |
client.put_object(put_request).await?; | |
Ok(()) | |
} | |
async fn multipart_handler( | |
opt: Opt, | |
client: S3Client, | |
mime: Mime, | |
body: impl Stream<Item = Result<impl Buf, warp::Error>> + Send + Sync + Unpin + 'static, | |
) -> anyhow::Result<()> { | |
let boundary = mime | |
.get_param("boundary") | |
.context("no boundary")? | |
.to_string(); | |
let mut data = MultipartStream::new( | |
boundary, | |
body.map_ok(|mut buf| buf.copy_to_bytes(buf.remaining())), | |
); | |
let mut key = None; | |
let mut stream = None; | |
let mut content_type = None; | |
while let Some(part) = data.try_next().await? { | |
let name = part.name().context("no name")?; | |
event!(Level::INFO, %name, "part"); | |
match name { | |
"key" => { | |
let key_bytes = collect_part_stream(part).await?; | |
key = Some(String::from_utf8(key_bytes)?); | |
} | |
"content" => { | |
content_type = Some(part.content_type().context("no content_type")?.to_owned()); | |
stream = Some(part); | |
break; | |
} | |
name => bail!("unknown key {}", name), | |
} | |
} | |
let key = key.context("no key")?; | |
let mut stream = stream.context("no content")?; | |
let part_size: u64 = 10 * 1024 * 1024; | |
event!(Level::INFO, "create multipart upload"); | |
let create_multipart_request = rusoto_s3::CreateMultipartUploadRequest { | |
bucket: opt.bucket.clone(), | |
key: key.clone(), | |
content_type, | |
..Default::default() | |
}; | |
let create_multipart_output = client | |
.create_multipart_upload(create_multipart_request) | |
.await?; | |
let upload_id = create_multipart_output.upload_id.context("no upload_id")?; | |
let mut buf = BytesMut::with_capacity((part_size as f64 * 1.5) as usize); | |
let mut part_number = 0; | |
let mut completed_parts = Vec::new(); | |
while let Some(chunk) = stream.try_next().await? { | |
event!(Level::TRACE, len = %chunk.len(), "chunk"); | |
buf.put(chunk); | |
let len: u64 = buf.len().try_into()?; | |
if len >= part_size { | |
event!(Level::INFO, %len, "upload part"); | |
let body = buf.copy_to_bytes(buf.remaining()); | |
let body = rusoto_core::ByteStream::new(stream::once(async move { Ok(body) })); | |
let upload_request = rusoto_s3::UploadPartRequest { | |
bucket: opt.bucket.clone(), | |
key: key.clone(), | |
body: Some(body), | |
content_length: Some(len.try_into()?), | |
part_number, | |
upload_id: upload_id.clone(), | |
..Default::default() | |
}; | |
let output = client.upload_part(upload_request).await?; | |
let e_tag = output.e_tag.context("no e_tag")?; | |
completed_parts.push(rusoto_s3::CompletedPart { | |
e_tag: Some(e_tag), | |
part_number: Some(part_number), | |
}); | |
part_number += 1; | |
} | |
} | |
if buf.has_remaining() { | |
let len = buf.len(); | |
event!(Level::INFO, %len, "upload part (last)"); | |
let body = rusoto_core::ByteStream::new(stream::once(async move { Ok(buf.freeze()) })); | |
let upload_request = rusoto_s3::UploadPartRequest { | |
bucket: opt.bucket.clone(), | |
key: key.clone(), | |
body: Some(body), | |
content_length: Some(len.try_into()?), | |
part_number, | |
upload_id: upload_id.clone(), | |
..Default::default() | |
}; | |
let output = client.upload_part(upload_request).await?; | |
let e_tag = output.e_tag.context("no e_tag")?; | |
completed_parts.push(rusoto_s3::CompletedPart { | |
e_tag: Some(e_tag), | |
part_number: Some(part_number), | |
}); | |
} | |
event!(Level::INFO, "complete multipart upload"); | |
let complete_request = rusoto_s3::CompleteMultipartUploadRequest { | |
bucket: opt.bucket.clone(), | |
key, | |
upload_id, | |
multipart_upload: Some(rusoto_s3::CompletedMultipartUpload { | |
parts: Some(completed_parts), | |
}), | |
..Default::default() | |
}; | |
client.complete_multipart_upload(complete_request).await?; | |
Ok(()) | |
} | |
async fn multipart_concurrent_handler( | |
opt: Opt, | |
client: S3Client, | |
mime: Mime, | |
body: impl Stream<Item = Result<impl Buf, warp::Error>> + Send + Sync + Unpin + 'static, | |
) -> anyhow::Result<()> { | |
let boundary = mime | |
.get_param("boundary") | |
.context("no boundary")? | |
.to_string(); | |
let mut data = MultipartStream::new( | |
boundary, | |
body.map_ok(|mut buf| buf.copy_to_bytes(buf.remaining())), | |
); | |
let mut key = None; | |
let mut stream = None; | |
let mut content_type = None; | |
while let Some(part) = data.try_next().await? { | |
let name = part.name().context("no name")?; | |
event!(Level::INFO, %name, "part"); | |
match name { | |
"key" => { | |
let key_bytes = collect_part_stream(part).await?; | |
key = Some(String::from_utf8(key_bytes)?); | |
} | |
"content" => { | |
content_type = Some(part.content_type().context("no content_type")?.to_owned()); | |
stream = Some(part); | |
break; | |
} | |
name => bail!("unknown key {}", name), | |
} | |
} | |
let key = key.context("no key")?; | |
let mut stream = stream.context("no content")?; | |
let part_size: u64 = 10 * 1024 * 1024; | |
event!(Level::INFO, "create multipart upload"); | |
let create_multipart_request = rusoto_s3::CreateMultipartUploadRequest { | |
bucket: opt.bucket.clone(), | |
key: key.clone(), | |
content_type, | |
..Default::default() | |
}; | |
let create_multipart_output = client | |
.create_multipart_upload(create_multipart_request) | |
.await?; | |
let upload_id = create_multipart_output.upload_id.context("no upload_id")?; | |
let client = Arc::new(client); | |
let mut buf = BytesMut::with_capacity((part_size as f64 * 1.5) as usize); | |
let mut part_number = 0; | |
let mut part_futures = Vec::new(); | |
while let Some(chunk) = stream.try_next().await? { | |
event!(Level::TRACE, len = %chunk.len(), "chunk"); | |
buf.put(chunk); | |
let len: u64 = buf.len().try_into()?; | |
if len >= part_size { | |
let body = buf.copy_to_bytes(buf.remaining()); | |
let fut = { | |
let bucket = opt.bucket.clone(); | |
let upload_id = upload_id.clone(); | |
let key = key.clone(); | |
let client = Arc::clone(&client); | |
async move { | |
event!(Level::INFO, %len, "upload part"); | |
let body = rusoto_core::ByteStream::new(stream::once(async move { Ok(body) })); | |
let upload_request = rusoto_s3::UploadPartRequest { | |
bucket, | |
key, | |
body: Some(body), | |
content_length: Some(len.try_into()?), | |
part_number, | |
upload_id, | |
..Default::default() | |
}; | |
let output = client.upload_part(upload_request).await?; | |
let e_tag = output.e_tag.context("no e_tag")?; | |
Ok::<_, anyhow::Error>(rusoto_s3::CompletedPart { | |
e_tag: Some(e_tag), | |
part_number: Some(part_number), | |
}) | |
} | |
}; | |
part_futures.push(tokio::spawn(fut)); | |
part_number += 1; | |
} | |
} | |
if buf.has_remaining() { | |
let fut = { | |
let bucket = opt.bucket.clone(); | |
let upload_id = upload_id.clone(); | |
let key = key.clone(); | |
let client = Arc::clone(&client); | |
async move { | |
let len = buf.len(); | |
event!(Level::INFO, %len, "upload part (last)"); | |
let body = | |
rusoto_core::ByteStream::new(stream::once(async move { Ok(buf.freeze()) })); | |
let upload_request = rusoto_s3::UploadPartRequest { | |
bucket, | |
key, | |
body: Some(body), | |
content_length: Some(len.try_into()?), | |
part_number, | |
upload_id, | |
..Default::default() | |
}; | |
let output = client.upload_part(upload_request).await?; | |
let e_tag = output.e_tag.context("no e_tag")?; | |
Ok::<_, anyhow::Error>(rusoto_s3::CompletedPart { | |
e_tag: Some(e_tag), | |
part_number: Some(part_number), | |
}) | |
} | |
}; | |
part_futures.push(tokio::spawn(fut)); | |
} | |
let mut completed_parts = Vec::with_capacity(part_futures.len()); | |
for fut in part_futures { | |
completed_parts.push(fut.await??); | |
} | |
event!(Level::INFO, "complete multipart upload"); | |
let complete_request = rusoto_s3::CompleteMultipartUploadRequest { | |
bucket: opt.bucket.clone(), | |
key, | |
upload_id, | |
multipart_upload: Some(rusoto_s3::CompletedMultipartUpload { | |
parts: Some(completed_parts), | |
}), | |
..Default::default() | |
}; | |
client.complete_multipart_upload(complete_request).await?; | |
Ok(()) | |
} | |
#[derive(Debug, Clone)] | |
struct UploadError(Arc<anyhow::Error>); | |
impl warp::reject::Reject for UploadError {} | |
#[tokio::main] | |
async fn main() -> anyhow::Result<()> { | |
let opt = Opt::from_args(); | |
tracing_subscriber::fmt().pretty().init(); | |
let dispatcher = rusoto_core::request::HttpClient::new()?; | |
let credentials = rusoto_credential::StaticProvider::new_minimal( | |
opt.access_key.clone(), | |
opt.access_secret.clone(), | |
); | |
let region = rusoto_core::Region::Custom { | |
name: "us-east-1".to_owned(), | |
endpoint: opt.endpoint.clone(), | |
}; | |
let client = S3Client::new_with(dispatcher, credentials, region); | |
if client | |
.head_bucket(rusoto_s3::HeadBucketRequest { | |
bucket: opt.bucket.clone(), | |
..Default::default() | |
}) | |
.await | |
.is_err() | |
{ | |
event!(Level::INFO, name = %opt.bucket, "create bucket"); | |
let create_bucket_req = rusoto_s3::CreateBucketRequest { | |
bucket: opt.bucket.clone(), | |
..Default::default() | |
}; | |
client.create_bucket(create_bucket_req).await?; | |
} | |
let bind = opt.bind; | |
let with_opt = warp::any().map(move || opt.clone()); | |
let with_client = warp::any().map(move || client.clone()); | |
let filter = warp::post().and(warp::path("upload")).and( | |
warp::path("stream") | |
.and(with_opt.clone()) | |
.and(with_client.clone()) | |
.and(warp::header::<Mime>("content-type")) | |
.and(warp::body::stream()) | |
.and_then(|opt: Opt, client: S3Client, mime: Mime, data| async move { | |
match stream_handler(opt, client, mime, data).await { | |
Ok(()) => Ok("OK"), | |
Err(err) => Err(warp::reject::custom(UploadError(Arc::new(err)))), | |
} | |
}) | |
.or(warp::path("collect") | |
.and(with_opt.clone()) | |
.and(with_client.clone()) | |
.and(warp::multipart::form().max_length(512 * 1024 * 1024)) | |
.and_then( | |
|opt: Opt, client: S3Client, data: warp::multipart::FormData| async move { | |
match collect_handler(opt, client, data).await { | |
Ok(()) => Ok("OK"), | |
Err(err) => Err(warp::reject::custom(UploadError(Arc::new(err)))), | |
} | |
}, | |
)) | |
.or(warp::path("multipart") | |
.and(with_opt.clone()) | |
.and(with_client.clone()) | |
.and(warp::header::<Mime>("content-type")) | |
.and(warp::body::stream()) | |
.and_then(|opt: Opt, client: S3Client, mime: Mime, data| async move { | |
match multipart_handler(opt, client, mime, data).await { | |
Ok(()) => Ok("OK"), | |
Err(err) => Err(warp::reject::custom(UploadError(Arc::new(err)))), | |
} | |
})) | |
.or(warp::path("multipart-concurrent") | |
.and(with_opt) | |
.and(with_client) | |
.and(warp::header::<Mime>("content-type")) | |
.and(warp::body::stream()) | |
.and_then(|opt: Opt, client: S3Client, mime: Mime, data| async move { | |
match multipart_concurrent_handler(opt, client, mime, data).await { | |
Ok(()) => Ok("OK"), | |
Err(err) => Err(warp::reject::custom(UploadError(Arc::new(err)))), | |
} | |
})), | |
); | |
warp::serve(filter).run(bind).await; | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment