Last active
April 2, 2023 04:13
-
-
Save ylashin/d23453f8a6ab6881f16cdf18daaf6427 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use arrow::{ | |
array::{Int32Array, StringArray}, | |
datatypes::{DataType, Field, Schema as ArrowSchema}, | |
record_batch::RecordBatch, | |
}; | |
use deltalake::operations::DeltaOps; | |
use deltalake::{builder::DeltaTableBuilder, DeltaTable}; | |
use std::collections::HashMap; | |
use std::sync::Arc; | |
fn get_data_to_write() -> RecordBatch { | |
let schema = Arc::new(ArrowSchema::new(vec![ | |
Field::new("id", DataType::Int32, false), | |
Field::new("name", DataType::Utf8, false), | |
])); | |
let ids: Vec<i32> = (1..=10).map(i32::from).collect(); | |
let names: Vec<String> = ids.iter().map(|x| format!("item {x}")).collect(); | |
let id_values = Int32Array::from(ids); | |
let name_values = StringArray::from(names); | |
RecordBatch::try_new(schema, vec![Arc::new(id_values), Arc::new(name_values)]).unwrap() | |
} | |
async fn append_to_table( | |
path: String, | |
backend_config: HashMap<String, String>, | |
batch: RecordBatch, | |
) -> DeltaTable { | |
let table = DeltaTableBuilder::from_uri(path) | |
.with_storage_options(backend_config) | |
.build() | |
.unwrap(); | |
let ops = DeltaOps::from(table); | |
let commit_result = ops.write(vec![batch.clone()]).await.unwrap(); | |
commit_result | |
} | |
#[tokio::main(flavor = "current_thread")] | |
async fn main() { | |
let s3_storage_location = "s3://delta-root/table1".to_string(); | |
let region = std::env::var("AWS_REGION").unwrap(); | |
let access_key = std::env::var("AWS_ACCESS_KEY_ID").unwrap(); | |
let secret_access_secret = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap(); | |
let mut backend_config: HashMap<String, String> = HashMap::new(); | |
backend_config.insert("AWS_REGION".to_string(), region); | |
backend_config.insert("AWS_ACCESS_KEY_ID".to_string(), access_key); | |
backend_config.insert("AWS_SECRET_ACCESS_KEY".to_string(), secret_access_secret); | |
backend_config.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".to_string()); | |
let batch = get_data_to_write(); | |
let table = append_to_table( | |
s3_storage_location, | |
backend_config, | |
batch, | |
) | |
.await; | |
println!("Data inserted with version : {}", table.version()); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment