Skip to content

Instantly share code, notes, and snippets.

@milenkovicm
Forked from a-agmon/df_lambda.rs
Created October 16, 2024 12:26
Show Gist options
  • Save milenkovicm/d07b93984d7d28ab595c725b6bffdbe7 to your computer and use it in GitHub Desktop.
Save milenkovicm/d07b93984d7d28ab595c725b6bffdbe7 to your computer and use it in GitHub Desktop.
Rusty AWS Lambda with Data Fusion and Iceberg
use std::{sync::Arc, time::Instant};
use datafusion::{
arrow::{array::RecordBatch, json::ArrayWriter},
prelude::SessionContext,
};
use iceberg::{io::FileIO, table::StaticTable, TableIdent};
use iceberg_datafusion::IcebergTableProvider;
use lambda_runtime::{
run, service_fn,
tracing::{self, info},
Error, LambdaEvent,
};
use serde_json::Value;
// main lambda entry point
async fn function_handler(
sql_context: &SessionContext,
event: LambdaEvent<Value>,
) -> Result<Value, Error> {
let payload = event.payload;
let query = payload["query"]
.as_str()
.ok_or("unable to extract query from request")?;
let df = sql_context.sql(query).await?;
let records = df.collect().await?;
let json_records = record_batches_to_json(records).await?;
Ok(json_records)
}
// convert results from DataFusion to Json
async fn record_batches_to_json(records: Vec<RecordBatch>) -> anyhow::Result<Value> {
let record_buffer = Vec::new();
let mut json_writer = ArrayWriter::new(record_buffer);
records.into_iter().for_each(|batch| {
if batch.num_rows() > 0 {
let written = json_writer.write(&batch);
if let Err(e) = written {
tracing::error!("Error writing record batch: {:?}", e);
}
}
});
json_writer.finish()?;
let record_buffer = json_writer.into_inner();
let json_val: Value = serde_json::from_slice(&record_buffer)?;
Ok(json_val)
}
// register iceberg table with data fusion using Iceberg-Rust
// and get the sql context to query the table
pub async fn get_sql_context(
metadata_loc: &str,
table_name: &str,
schema: &str,
) -> anyhow::Result<SessionContext> {
let file_io = FileIO::from_path(metadata_loc)?.build()?;
let table_indent = TableIdent::from_strs([schema, table_name])?;
let static_table = StaticTable::from_metadata_file(metadata_loc, table_indent, file_io).await?;
let table = static_table.into_table();
let table_provider = IcebergTableProvider::try_new_from_table(table).await?;
let ctx = SessionContext::new();
ctx.register_table(table_name, Arc::new(table_provider))?;
Ok(ctx)
}
// get the path of the table metadata file from Glue
pub async fn get_glue_table_iceberg_metadata(
table_name: &str,
data_base_name: &str,
) -> anyhow::Result<String> {
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let client = aws_sdk_glue::Client::new(&config);
let table = client
.get_table()
.database_name(data_base_name)
.name(table_name)
.send()
.await?;
let table = table.table.unwrap();
let properties = table
.parameters
.ok_or(anyhow::anyhow!("No parameters found on table"))?;
let metadata_location = properties.get("metadata_location").ok_or(anyhow::anyhow!(
"metadata_location not found on table props found on table"
))?;
Ok(metadata_location.to_string())
}
static TABLE_NAME: &str = "glue_table";
static SCHEMA_NAME: &str = "glue_database";
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing::init_default_subscriber();
let (table, schema) = (TABLE_NAME, SCHEMA_NAME);
let metadata_loc = get_glue_table_iceberg_metadata(table, schema).await?;
let sql_context = get_sql_context(&metadata_loc, table, schema).await?;
let sql_context = Arc::new(sql_context);
run(service_fn(move |event: LambdaEvent<Value>| {
let context = sql_context.clone();
async move { function_handler(&context, event).await }
}))
.await
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment