Last active
January 9, 2025 20:03
-
-
Save amankrx/d3861756e00e0d5157cd4230477b4024 to your computer and use it in GitHub Desktop.
GCS storage operations with gRPC on Rust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use gcloud_sdk::google::storage::v2::{bidi_write_object_request, bidi_write_object_response, storage_client::StorageClient, write_object_request, BidiWriteObjectRequest, ChecksummedData, Object, ReadObjectRequest, StartResumableWriteRequest, WriteObjectRequest, WriteObjectSpec}; | |
use tonic::{metadata::MetadataValue, transport::Channel, Request}; | |
use futures::stream; | |
use tokio_stream::StreamExt; | |
use tonic::transport::ClientTlsConfig; | |
// Constants for GCS operations | |
// Unlike what is specified in the docs, there is a slight discrepancy between | |
// the chunk and multipart sizes limit. The chunk size is recommended to be 8MB | |
// but the gRPC connection can't handle the size greater than 4MB. | |
// Also, we are keeping it slightly below the 4MB limit to make sure we don't | |
// exceed the limit when some metadata is added to the request. It was observed | |
// that an additional 15 bytes were added to the request size for the write operation. | |
const MIN_MULTIPART_SIZE: u64 = 4 * 1024 * 1000; | |
const CHUNK_SIZE: usize = 4 * 1024 * 1000; | |
const BUCKET_NAME: &str = "test-bucket"; | |
const GOOGLE_AUTH_TOKEN: &str = "token-xxxxxxxx"; | |
struct GcsClient { | |
client: StorageClient<Channel>, | |
bucket: String, | |
} | |
impl GcsClient { | |
async fn new(bucket: String) -> Result<Self, Box<dyn std::error::Error>> { | |
let channel = Channel::from_static("https://storage.googleapis.com") | |
.connect_timeout(std::time::Duration::from_secs(5)) | |
.timeout(std::time::Duration::from_secs(30)) | |
.tcp_nodelay(true) | |
.http2_adaptive_window(true) | |
.http2_keep_alive_interval(std::time::Duration::from_secs(30)) | |
.tls_config(ClientTlsConfig::new().with_native_roots())? | |
.connect() | |
.await?; | |
Ok(Self { | |
client: StorageClient::new(channel), | |
bucket, | |
}) | |
} | |
fn get_formatted_bucket(&self) -> String { | |
format!("projects/_/buckets/{}", self.bucket) | |
} | |
fn add_common_headers(&self, metadata: &mut tonic::metadata::MetadataMap) -> Result<(), Box<dyn std::error::Error>> { | |
// Add authorization header | |
let token = GOOGLE_AUTH_TOKEN; | |
metadata.insert( | |
"authorization", | |
MetadataValue::try_from(&format!("Bearer {}", token)).unwrap(), | |
); | |
// Add bucket parameter | |
let bucket = self.get_formatted_bucket(); | |
let encoded_bucket = urlencoding::encode(&bucket); | |
let params = format!("bucket={}", encoded_bucket); | |
metadata.insert( | |
"x-goog-request-params", | |
MetadataValue::try_from(¶ms).unwrap(), | |
); | |
Ok(()) | |
} | |
// Auth_request to use the common header function | |
async fn auth_request<T>(&self, request: T) -> Request<T> { | |
let mut request = Request::new(request); | |
self.add_common_headers(request.metadata_mut()) | |
.expect("Failed to add headers"); | |
request | |
} | |
async fn simple_upload(&mut self, object_name: &str, data: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> { | |
let size = data.len() as i64; | |
let bucket = self.get_formatted_bucket(); | |
// Create write specification with proper bucket format | |
let write_spec = WriteObjectSpec { | |
resource: Some(Object { | |
name: object_name.to_string(), | |
bucket: bucket.clone(), | |
size, | |
content_type: "application/octet-stream".to_string(), | |
..Default::default() | |
}), | |
object_size: Some(size), | |
..Default::default() | |
}; | |
let crc32c = crc32c::crc32c(&data); | |
let init_request = WriteObjectRequest { | |
first_message: Some(write_object_request::FirstMessage::WriteObjectSpec(write_spec)), | |
write_offset: 0, | |
data: None, | |
finish_write: false, | |
..Default::default() | |
}; | |
let data_request = WriteObjectRequest { | |
first_message: None, | |
write_offset: 0, | |
data: Some(write_object_request::Data::ChecksummedData(ChecksummedData { | |
content: data, | |
crc32c: Some(crc32c), | |
})), | |
finish_write: true, | |
..Default::default() | |
}; | |
let request_stream = stream::iter(vec![init_request, data_request]); | |
let mut req = Request::new(request_stream); | |
// Use the common header function | |
self.add_common_headers(req.metadata_mut())?; | |
match self.client.write_object(req).await { | |
Ok(_) => { | |
Ok(()) | |
} | |
Err(e) => { | |
println!("Error details: {:?}", e); | |
Err(Box::new(e)) | |
} | |
} | |
} | |
async fn resumable_upload(&mut self, object_name: &str, data: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> { | |
let size = data.len() as i64; | |
let bucket = self.get_formatted_bucket(); | |
// Get upload ID first | |
let start_request = StartResumableWriteRequest { | |
write_object_spec: Some(WriteObjectSpec { | |
resource: Some(Object { | |
name: object_name.to_string(), | |
bucket: bucket.clone(), | |
size, | |
content_type: "application/octet-stream".to_string(), | |
..Default::default() | |
}), | |
object_size: Some(size), | |
..Default::default() | |
}), | |
common_object_request_params: None, | |
object_checksums: None, | |
}; | |
let request = self.auth_request(start_request).await; | |
let response = self.client.start_resumable_write(request).await?; | |
let upload_id = response.into_inner().upload_id; | |
// Prepare chunks for BidiWriteObjectRequest | |
let total_size = data.len(); | |
let mut offset = 0; | |
let mut requests = Vec::new(); | |
// First message with upload ID | |
requests.push(BidiWriteObjectRequest { | |
first_message: Some(bidi_write_object_request::FirstMessage::UploadId(upload_id.clone())), | |
write_offset: 0, | |
finish_write: false, | |
data: None, | |
..Default::default() | |
}); | |
// Split data into chunks | |
while offset < total_size { | |
let end = std::cmp::min(offset + CHUNK_SIZE, total_size); | |
let chunk = data[offset..end].to_vec(); | |
let is_last = end == total_size; | |
let crc32c = crc32c::crc32c(&chunk); | |
requests.push(BidiWriteObjectRequest { | |
first_message: None, | |
write_offset: offset as i64, | |
data: Some(bidi_write_object_request::Data::ChecksummedData(ChecksummedData { | |
content: chunk, | |
crc32c: Some(crc32c), | |
})), | |
finish_write: is_last, | |
..Default::default() | |
}); | |
offset = end; | |
} | |
// Create stream and send requests | |
let request_stream = stream::iter(requests); | |
let mut req = Request::new(request_stream); | |
self.add_common_headers(req.metadata_mut())?; | |
// Rest of the function remains the same | |
match self.client.bidi_write_object(req).await { | |
Ok(response) => { | |
println!("Resumable upload started! Processing responses..."); | |
let mut stream = response.into_inner(); | |
while let Some(response) = stream.next().await { | |
match response { | |
Ok(resp) => { | |
if let Some(status) = resp.write_status { | |
match status { | |
bidi_write_object_response::WriteStatus::Resource(obj) => { | |
println!("Upload complete with object: {:?}", obj); | |
} | |
bidi_write_object_response::WriteStatus::PersistedSize(size) => { | |
println!("Upload in progress, persisted size: {}", size); | |
} | |
} | |
} | |
} | |
Err(e) => { | |
println!("Error in response stream: {:?}", e); | |
return Err(Box::new(e)); | |
} | |
} | |
} | |
Ok(()) | |
} | |
Err(e) => { | |
println!("Error details: {:?}", e); | |
Err(Box::new(e)) | |
} | |
} | |
} | |
async fn upload_object(&mut self, object_name: &str, data: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> { | |
if (data.len() as u64) < MIN_MULTIPART_SIZE { | |
println!("Performing simple upload"); | |
self.simple_upload(object_name, data).await?; | |
} else { | |
println!("Performing resumable upload"); | |
self.resumable_upload(object_name, data).await?; | |
} | |
Ok(()) | |
} | |
async fn check_object_exists(&mut self, object_name: &str) -> Result<Option<i64>, Box<dyn std::error::Error>> { | |
let request = ReadObjectRequest { | |
bucket: self.get_formatted_bucket(), | |
object: object_name.to_string(), | |
..Default::default() | |
}; | |
let request = self.auth_request(request).await; | |
match self.client.read_object(request).await { | |
Ok(response) => { | |
let mut stream = response.into_inner(); | |
if let Some(Ok(first_message)) = StreamExt::next(&mut stream).await { | |
if let Some(metadata) = first_message.metadata { | |
return Ok(Some(metadata.size)); | |
} | |
} | |
Ok(None) | |
} | |
Err(e) => { | |
println!("Error details: {:?}", e); | |
Err(Box::new(e)) | |
}, | |
} | |
} | |
async fn download_object(&mut self, object_name: &str) -> Result<Vec<u8>, Box<dyn std::error::Error>> { | |
let request = ReadObjectRequest { | |
bucket: self.get_formatted_bucket(), | |
object: object_name.to_string(), | |
..Default::default() | |
}; | |
let request = self.auth_request(request).await; | |
let response = self.client.read_object(request).await?; | |
let mut stream = response.into_inner(); | |
let mut content = Vec::new(); | |
while let Some(chunk) = StreamExt::next(&mut stream).await { | |
let chunk = chunk?; | |
if let Some(data) = chunk.checksummed_data { | |
content.extend(data.content); | |
} | |
} | |
Ok(content) | |
} | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
let bucket = BUCKET_NAME.to_string(); | |
let mut client = GcsClient::new(bucket).await?; | |
// Test small file upload | |
println!("\n=== Testing Small File Upload ==="); | |
let small_content = b"Hello, GCS! This is a test.".to_vec(); | |
let small_object_name = "bidi-test-small-file.txt"; | |
println!("Uploading small file: {}", small_object_name); | |
client.upload_object(small_object_name, small_content.clone()).await?; | |
println!("Small file upload complete!"); | |
// Test large file upload | |
println!("\n=== Testing Large File Upload ==="); | |
// Create a large file (11MB to trigger resumable upload) | |
let large_content = vec![b'A'; 11 * 1024 * 1024]; | |
let large_object_name = "bidi-test-large-file.txt"; | |
println!("Uploading large file: {}", large_object_name); | |
client.upload_object(large_object_name, large_content.clone()).await?; | |
println!("Large file upload complete!"); | |
// Check if large file exists | |
println!("\n=== Checking Large File ==="); | |
if let Some(size) = client.check_object_exists(large_object_name).await? { | |
println!("Large file exists with size: {} bytes", size); | |
} else { | |
println!("Large file not found!"); | |
} | |
// Download and verify large file | |
println!("\n=== Downloading Large File ==="); | |
let downloaded = client.download_object(large_object_name).await?; | |
println!("Downloaded size: {} bytes", downloaded.len()); | |
println!("Content verification: {}", downloaded == large_content); | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
These are the dependencies associated with the above example: