Skip to content

Instantly share code, notes, and snippets.

@amankrx
Last active January 9, 2025 20:03
Show Gist options
  • Save amankrx/d3861756e00e0d5157cd4230477b4024 to your computer and use it in GitHub Desktop.
Save amankrx/d3861756e00e0d5157cd4230477b4024 to your computer and use it in GitHub Desktop.
GCS storage operations with gRPC on Rust
use gcloud_sdk::google::storage::v2::{bidi_write_object_request, bidi_write_object_response, storage_client::StorageClient, write_object_request, BidiWriteObjectRequest, ChecksummedData, Object, ReadObjectRequest, StartResumableWriteRequest, WriteObjectRequest, WriteObjectSpec};
use tonic::{metadata::MetadataValue, transport::Channel, Request};
use futures::stream;
use tokio_stream::StreamExt;
use tonic::transport::ClientTlsConfig;
// Constants for GCS operations
// Unlike what is specified in the docs, there is a slight discrepancy between
// the chunk and multipart sizes limit. The chunk size is recommended to be 8MB
// but the gRPC connection can't handle the size greater than 4MB.
// Also, we are keeping it slightly below the 4MB limit to make sure we don't
// exceed the limit when some metadata is added to the request. It was observed
// that an additional 15 bytes were added to the request size for the write operation.
const MIN_MULTIPART_SIZE: u64 = 4 * 1024 * 1000;
const CHUNK_SIZE: usize = 4 * 1024 * 1000;
const BUCKET_NAME: &str = "test-bucket";
const GOOGLE_AUTH_TOKEN: &str = "token-xxxxxxxx";
struct GcsClient {
client: StorageClient<Channel>,
bucket: String,
}
impl GcsClient {
async fn new(bucket: String) -> Result<Self, Box<dyn std::error::Error>> {
let channel = Channel::from_static("https://storage.googleapis.com")
.connect_timeout(std::time::Duration::from_secs(5))
.timeout(std::time::Duration::from_secs(30))
.tcp_nodelay(true)
.http2_adaptive_window(true)
.http2_keep_alive_interval(std::time::Duration::from_secs(30))
.tls_config(ClientTlsConfig::new().with_native_roots())?
.connect()
.await?;
Ok(Self {
client: StorageClient::new(channel),
bucket,
})
}
fn get_formatted_bucket(&self) -> String {
format!("projects/_/buckets/{}", self.bucket)
}
fn add_common_headers(&self, metadata: &mut tonic::metadata::MetadataMap) -> Result<(), Box<dyn std::error::Error>> {
// Add authorization header
let token = GOOGLE_AUTH_TOKEN;
metadata.insert(
"authorization",
MetadataValue::try_from(&format!("Bearer {}", token)).unwrap(),
);
// Add bucket parameter
let bucket = self.get_formatted_bucket();
let encoded_bucket = urlencoding::encode(&bucket);
let params = format!("bucket={}", encoded_bucket);
metadata.insert(
"x-goog-request-params",
MetadataValue::try_from(&params).unwrap(),
);
Ok(())
}
// Auth_request to use the common header function
async fn auth_request<T>(&self, request: T) -> Request<T> {
let mut request = Request::new(request);
self.add_common_headers(request.metadata_mut())
.expect("Failed to add headers");
request
}
async fn simple_upload(&mut self, object_name: &str, data: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
let size = data.len() as i64;
let bucket = self.get_formatted_bucket();
// Create write specification with proper bucket format
let write_spec = WriteObjectSpec {
resource: Some(Object {
name: object_name.to_string(),
bucket: bucket.clone(),
size,
content_type: "application/octet-stream".to_string(),
..Default::default()
}),
object_size: Some(size),
..Default::default()
};
let crc32c = crc32c::crc32c(&data);
let init_request = WriteObjectRequest {
first_message: Some(write_object_request::FirstMessage::WriteObjectSpec(write_spec)),
write_offset: 0,
data: None,
finish_write: false,
..Default::default()
};
let data_request = WriteObjectRequest {
first_message: None,
write_offset: 0,
data: Some(write_object_request::Data::ChecksummedData(ChecksummedData {
content: data,
crc32c: Some(crc32c),
})),
finish_write: true,
..Default::default()
};
let request_stream = stream::iter(vec![init_request, data_request]);
let mut req = Request::new(request_stream);
// Use the common header function
self.add_common_headers(req.metadata_mut())?;
match self.client.write_object(req).await {
Ok(_) => {
Ok(())
}
Err(e) => {
println!("Error details: {:?}", e);
Err(Box::new(e))
}
}
}
async fn resumable_upload(&mut self, object_name: &str, data: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
let size = data.len() as i64;
let bucket = self.get_formatted_bucket();
// Get upload ID first
let start_request = StartResumableWriteRequest {
write_object_spec: Some(WriteObjectSpec {
resource: Some(Object {
name: object_name.to_string(),
bucket: bucket.clone(),
size,
content_type: "application/octet-stream".to_string(),
..Default::default()
}),
object_size: Some(size),
..Default::default()
}),
common_object_request_params: None,
object_checksums: None,
};
let request = self.auth_request(start_request).await;
let response = self.client.start_resumable_write(request).await?;
let upload_id = response.into_inner().upload_id;
// Prepare chunks for BidiWriteObjectRequest
let total_size = data.len();
let mut offset = 0;
let mut requests = Vec::new();
// First message with upload ID
requests.push(BidiWriteObjectRequest {
first_message: Some(bidi_write_object_request::FirstMessage::UploadId(upload_id.clone())),
write_offset: 0,
finish_write: false,
data: None,
..Default::default()
});
// Split data into chunks
while offset < total_size {
let end = std::cmp::min(offset + CHUNK_SIZE, total_size);
let chunk = data[offset..end].to_vec();
let is_last = end == total_size;
let crc32c = crc32c::crc32c(&chunk);
requests.push(BidiWriteObjectRequest {
first_message: None,
write_offset: offset as i64,
data: Some(bidi_write_object_request::Data::ChecksummedData(ChecksummedData {
content: chunk,
crc32c: Some(crc32c),
})),
finish_write: is_last,
..Default::default()
});
offset = end;
}
// Create stream and send requests
let request_stream = stream::iter(requests);
let mut req = Request::new(request_stream);
self.add_common_headers(req.metadata_mut())?;
// Rest of the function remains the same
match self.client.bidi_write_object(req).await {
Ok(response) => {
println!("Resumable upload started! Processing responses...");
let mut stream = response.into_inner();
while let Some(response) = stream.next().await {
match response {
Ok(resp) => {
if let Some(status) = resp.write_status {
match status {
bidi_write_object_response::WriteStatus::Resource(obj) => {
println!("Upload complete with object: {:?}", obj);
}
bidi_write_object_response::WriteStatus::PersistedSize(size) => {
println!("Upload in progress, persisted size: {}", size);
}
}
}
}
Err(e) => {
println!("Error in response stream: {:?}", e);
return Err(Box::new(e));
}
}
}
Ok(())
}
Err(e) => {
println!("Error details: {:?}", e);
Err(Box::new(e))
}
}
}
async fn upload_object(&mut self, object_name: &str, data: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
if (data.len() as u64) < MIN_MULTIPART_SIZE {
println!("Performing simple upload");
self.simple_upload(object_name, data).await?;
} else {
println!("Performing resumable upload");
self.resumable_upload(object_name, data).await?;
}
Ok(())
}
async fn check_object_exists(&mut self, object_name: &str) -> Result<Option<i64>, Box<dyn std::error::Error>> {
let request = ReadObjectRequest {
bucket: self.get_formatted_bucket(),
object: object_name.to_string(),
..Default::default()
};
let request = self.auth_request(request).await;
match self.client.read_object(request).await {
Ok(response) => {
let mut stream = response.into_inner();
if let Some(Ok(first_message)) = StreamExt::next(&mut stream).await {
if let Some(metadata) = first_message.metadata {
return Ok(Some(metadata.size));
}
}
Ok(None)
}
Err(e) => {
println!("Error details: {:?}", e);
Err(Box::new(e))
},
}
}
async fn download_object(&mut self, object_name: &str) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let request = ReadObjectRequest {
bucket: self.get_formatted_bucket(),
object: object_name.to_string(),
..Default::default()
};
let request = self.auth_request(request).await;
let response = self.client.read_object(request).await?;
let mut stream = response.into_inner();
let mut content = Vec::new();
while let Some(chunk) = StreamExt::next(&mut stream).await {
let chunk = chunk?;
if let Some(data) = chunk.checksummed_data {
content.extend(data.content);
}
}
Ok(content)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let bucket = BUCKET_NAME.to_string();
let mut client = GcsClient::new(bucket).await?;
// Test small file upload
println!("\n=== Testing Small File Upload ===");
let small_content = b"Hello, GCS! This is a test.".to_vec();
let small_object_name = "bidi-test-small-file.txt";
println!("Uploading small file: {}", small_object_name);
client.upload_object(small_object_name, small_content.clone()).await?;
println!("Small file upload complete!");
// Test large file upload
println!("\n=== Testing Large File Upload ===");
// Create a large file (11MB to trigger resumable upload)
let large_content = vec![b'A'; 11 * 1024 * 1024];
let large_object_name = "bidi-test-large-file.txt";
println!("Uploading large file: {}", large_object_name);
client.upload_object(large_object_name, large_content.clone()).await?;
println!("Large file upload complete!");
// Check if large file exists
println!("\n=== Checking Large File ===");
if let Some(size) = client.check_object_exists(large_object_name).await? {
println!("Large file exists with size: {} bytes", size);
} else {
println!("Large file not found!");
}
// Download and verify large file
println!("\n=== Downloading Large File ===");
let downloaded = client.download_object(large_object_name).await?;
println!("Downloaded size: {} bytes", downloaded.len());
println!("Content verification: {}", downloaded == large_content);
Ok(())
}
@amankrx
Copy link
Author

amankrx commented Jan 7, 2025

These are the dependencies associated with the above example:

[dependencies]
hyper = { version = "0.14", features = ["stream"] }
hyper-rustls = { version = "0.23", features = ["http2"] }
tokio = { version = "1.42.0", features = ["full"] }
futures = "0.3.31"
tonic = { version = "^0.12.3", features = ["transport","tls","tls-roots"] }
tokio-stream = "0.1.17"
urlencoding = "2.1.3"
crc32c = "0.6.8"
gcloud-sdk = { version = "0.26.1", features = ["google-storage-v2"] }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment