Skip to content

Instantly share code, notes, and snippets.

@marceloneppel
Created September 11, 2024 10:43
Show Gist options
  • Save marceloneppel/4d32f5a99a32c5c116c72228e1e1b65e to your computer and use it in GitHub Desktop.
Save marceloneppel/4d32f5a99a32c5c116c72228e1e1b65e to your computer and use it in GitHub Desktop.
For R2
diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs
index 20d498d678..004a49a006 100644
--- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs
+++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs
@@ -376,10 +376,32 @@ impl OpendalStreamingUploader {
impl StreamingUploader for OpendalStreamingUploader {
async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> {
assert!(self.is_valid);
- self.not_uploaded_len += data.len();
- self.buf.push(data);
- if self.not_uploaded_len >= Self::UPLOAD_BUFFER_SIZE {
- self.flush().await?;
+ if true {
+ let parts_to_upload_len = self.not_uploaded_len + data.len();
+ if parts_to_upload_len < Self::UPLOAD_BUFFER_SIZE {
+ // Store everything and flush later.
+ self.not_uploaded_len += data.len();
+ self.buf.push(data);
+ } else if parts_to_upload_len == Self::UPLOAD_BUFFER_SIZE {
+ // Store everything and flush now.
+ self.buf.push(data);
+ self.flush().await?;
+ } else {
+ // Split the data and store the first part, then flush.
+ let data_part_to_upload_now = data.clone().into_iter().take(Self::UPLOAD_BUFFER_SIZE - self.not_uploaded_len).collect::<Bytes>();
+ let data_part_to_upload_later = data.clone().into_iter().skip(data_part_to_upload_now.len()).collect::<Bytes>();
+ self.buf.push(data_part_to_upload_now);
+ self.flush().await?;
+ // Store the rest of the data and flush later.
+ self.not_uploaded_len = data_part_to_upload_later.len();
+ self.buf.push(data_part_to_upload_later);
+ }
+ } else {
+ self.not_uploaded_len += data.len();
+ self.buf.push(data);
+ if self.not_uploaded_len >= Self::UPLOAD_BUFFER_SIZE {
+ self.flush().await?;
+ }
}
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment