Created
September 11, 2024 10:43
-
-
Save marceloneppel/4d32f5a99a32c5c116c72228e1e1b65e to your computer and use it in GitHub Desktop.
For R2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs | |
index 20d498d678..004a49a006 100644 | |
--- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs | |
+++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs | |
@@ -376,10 +376,32 @@ impl OpendalStreamingUploader { | |
impl StreamingUploader for OpendalStreamingUploader { | |
async fn write_bytes(&mut self, data: Bytes) -> ObjectResult<()> { | |
assert!(self.is_valid); | |
- self.not_uploaded_len += data.len(); | |
- self.buf.push(data); | |
- if self.not_uploaded_len >= Self::UPLOAD_BUFFER_SIZE { | |
- self.flush().await?; | |
+ if true { | |
+ let parts_to_upload_len = self.not_uploaded_len + data.len(); | |
+ if parts_to_upload_len < Self::UPLOAD_BUFFER_SIZE { | |
+ // Store everything and flush later. | |
+ self.not_uploaded_len += data.len(); | |
+ self.buf.push(data); | |
+ } else if parts_to_upload_len == Self::UPLOAD_BUFFER_SIZE { | |
+ // Store everything and flush now. | |
+ self.buf.push(data); | |
+ self.flush().await?; | |
+ } else { | |
+ // Split the data and store the first part, then flush. | |
+ let data_part_to_upload_now = data.clone().into_iter().take(Self::UPLOAD_BUFFER_SIZE - self.not_uploaded_len).collect::<Bytes>(); | |
+ let data_part_to_upload_later = data.clone().into_iter().skip(data_part_to_upload_now.len()).collect::<Bytes>(); | |
+ self.buf.push(data_part_to_upload_now); | |
+ self.flush().await?; | |
+ // Store the rest of the data and flush later. | |
+ self.not_uploaded_len = data_part_to_upload_later.len(); | |
+ self.buf.push(data_part_to_upload_later); | |
+ } | |
+ } else { | |
+ self.not_uploaded_len += data.len(); | |
+ self.buf.push(data); | |
+ if self.not_uploaded_len >= Self::UPLOAD_BUFFER_SIZE { | |
+ self.flush().await?; | |
+ } | |
} | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment