-
-
Save milenkovicm/d07b93984d7d28ab595c725b6bffdbe7 to your computer and use it in GitHub Desktop.
Rusty AWS Lambda with Data Fusion and Iceberg
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{sync::Arc, time::Instant}; | |
use datafusion::{ | |
arrow::{array::RecordBatch, json::ArrayWriter}, | |
prelude::SessionContext, | |
}; | |
use iceberg::{io::FileIO, table::StaticTable, TableIdent}; | |
use iceberg_datafusion::IcebergTableProvider; | |
use lambda_runtime::{ | |
run, service_fn, | |
tracing::{self, info}, | |
Error, LambdaEvent, | |
}; | |
use serde_json::Value; | |
// main lambda entry point | |
async fn function_handler( | |
sql_context: &SessionContext, | |
event: LambdaEvent<Value>, | |
) -> Result<Value, Error> { | |
let payload = event.payload; | |
let query = payload["query"] | |
.as_str() | |
.ok_or("unable to extract query from request")?; | |
let df = sql_context.sql(query).await?; | |
let records = df.collect().await?; | |
let json_records = record_batches_to_json(records).await?; | |
Ok(json_records) | |
} | |
// convert results from DataFusion to Json | |
async fn record_batches_to_json(records: Vec<RecordBatch>) -> anyhow::Result<Value> { | |
let record_buffer = Vec::new(); | |
let mut json_writer = ArrayWriter::new(record_buffer); | |
records.into_iter().for_each(|batch| { | |
if batch.num_rows() > 0 { | |
let written = json_writer.write(&batch); | |
if let Err(e) = written { | |
tracing::error!("Error writing record batch: {:?}", e); | |
} | |
} | |
}); | |
json_writer.finish()?; | |
let record_buffer = json_writer.into_inner(); | |
let json_val: Value = serde_json::from_slice(&record_buffer)?; | |
Ok(json_val) | |
} | |
// register iceberg table with data fusion using Iceberg-Rust | |
// and get the sql context to query the table | |
pub async fn get_sql_context( | |
metadata_loc: &str, | |
table_name: &str, | |
schema: &str, | |
) -> anyhow::Result<SessionContext> { | |
let file_io = FileIO::from_path(metadata_loc)?.build()?; | |
let table_indent = TableIdent::from_strs([schema, table_name])?; | |
let static_table = StaticTable::from_metadata_file(metadata_loc, table_indent, file_io).await?; | |
let table = static_table.into_table(); | |
let table_provider = IcebergTableProvider::try_new_from_table(table).await?; | |
let ctx = SessionContext::new(); | |
ctx.register_table(table_name, Arc::new(table_provider))?; | |
Ok(ctx) | |
} | |
// get the path of the table metadata file from Glue | |
pub async fn get_glue_table_iceberg_metadata( | |
table_name: &str, | |
data_base_name: &str, | |
) -> anyhow::Result<String> { | |
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; | |
let client = aws_sdk_glue::Client::new(&config); | |
let table = client | |
.get_table() | |
.database_name(data_base_name) | |
.name(table_name) | |
.send() | |
.await?; | |
let table = table.table.unwrap(); | |
let properties = table | |
.parameters | |
.ok_or(anyhow::anyhow!("No parameters found on table"))?; | |
let metadata_location = properties.get("metadata_location").ok_or(anyhow::anyhow!( | |
"metadata_location not found on table props found on table" | |
))?; | |
Ok(metadata_location.to_string()) | |
} | |
static TABLE_NAME: &str = "glue_table"; | |
static SCHEMA_NAME: &str = "glue_database"; | |
#[tokio::main] | |
async fn main() -> Result<(), Error> { | |
tracing::init_default_subscriber(); | |
let (table, schema) = (TABLE_NAME, SCHEMA_NAME); | |
let metadata_loc = get_glue_table_iceberg_metadata(table, schema).await?; | |
let sql_context = get_sql_context(&metadata_loc, table, schema).await?; | |
let sql_context = Arc::new(sql_context); | |
run(service_fn(move |event: LambdaEvent<Value>| { | |
let context = sql_context.clone(); | |
async move { function_handler(&context, event).await } | |
})) | |
.await | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment