Created
September 7, 2020 07:20
-
-
Save ritesh/d0ea5f7babf2dcab594d259391dee92c to your computer and use it in GitHub Desktop.
S3 pagination rust
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::{stream, Stream, TryStreamExt}; | |
use rusoto_core::RusotoError; | |
use rusoto_core::credential::ChainProvider; | |
use rusoto_core::request::HttpClient; | |
use rusoto_core::Region; | |
use rusoto_s3::{ListObjectsV2Error, ListObjectsV2Request, Object, S3, S3Client}; | |
use std::{pin::Pin}; | |
//Lifted from here | |
//https://github.com/softprops/dynomite/blob/master/dynomite/src/ext.rs | |
// S3Stream provides streaming APIs for S3 client operations. | |
// | |
type S3Stream<I, E> = Pin<Box<dyn Stream<Item = Result<I, RusotoError<E>>> + Send>>; | |
/// Extension methods for S3 client type | |
/// | |
/// A default impl is provided for `S3 Clone + Send + Sync + 'static` which adds autopaginating `Stream` interfaces that require | |
/// taking ownership. | |
pub trait S3Ext { | |
/// An auto-paginating `Stream` oriented version of `list_objects_v2` | |
fn list_objects_v2_pages( | |
self, | |
input: ListObjectsV2Request, | |
) -> S3Stream<Object, ListObjectsV2Error>; | |
} | |
impl<S> S3Ext for S | |
where | |
S: S3 + Clone + Send + Sync + 'static, | |
{ | |
//The way this works is that we use stream::try_unfold, which takes as input a "seed" object | |
//and a closure. The closure is called with the seed and then we wait for the closure to return | |
//(a, b) where a is the value to yield (in our case an S3 Object name) and b is the next | |
//internal state which the closure is called with again. | |
//If the closure returns None, instead of Some(TryFuture) the streaming stops | |
fn list_objects_v2_pages( | |
self, | |
input: ListObjectsV2Request, | |
) -> S3Stream<Object, ListObjectsV2Error> { | |
enum PageState { | |
Next(Option<String>, ListObjectsV2Request), | |
End, | |
} | |
println!("continuation token {:?}", input.continuation_token); | |
Box::pin( | |
stream::try_unfold( | |
PageState::Next( | |
input.continuation_token.clone(), | |
input, | |
), | |
//This returns either a None or Some(TryFuture) | |
move |state| { | |
let clone = self.clone(); | |
async move { | |
let (continuation_token, input) = match state { | |
PageState::Next(start, input) => (start, input), | |
PageState::End => { | |
return Ok(None) as Result<_, RusotoError<ListObjectsV2Error>> | |
} | |
}; | |
let resp = clone | |
.list_objects_v2(ListObjectsV2Request { | |
continuation_token: continuation_token.clone(), | |
..input.clone() | |
}) | |
.await?; | |
let next_state = match resp.next_continuation_token { | |
Some(continuation_token) => PageState::Next(Some(continuation_token), input), | |
_ => PageState::End, | |
}; | |
Ok(Some(( | |
stream::iter(resp.contents.unwrap_or_default().into_iter().map(Ok)), | |
next_state, | |
))) | |
} | |
}, | |
) | |
.try_flatten(), | |
) | |
} | |
} | |
#[tokio::main] | |
async fn main() { | |
let credprovider = ChainProvider::new(); | |
let client = S3Client::new_with( | |
HttpClient::new().expect("Failed to create HTTP client"), | |
credprovider, | |
Region::UsEast1, | |
); | |
let listobjinput = ListObjectsV2Request { | |
bucket: "my-cool-bucket".to_owned(), | |
..Default::default() | |
}; | |
let mut foo = client.list_objects_v2_pages(listobjinput); | |
let abc: Result<Vec<Object>, RusotoError<ListObjectsV2Error>> = foo.try_collect().await; | |
match abc { | |
Ok(v) => println!("Vector length is {}", v.len()), | |
Err(_)=> println!("Failed") | |
}; | |
println!("Done"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment