Skip to content

Instantly share code, notes, and snippets.

@doriansmiley
Created November 10, 2025 04:13
Show Gist options
  • Select an option

  • Save doriansmiley/dd30566b60eccc507b43134a3c0ea88b to your computer and use it in GitHub Desktop.

Select an option

Save doriansmiley/dd30566b60eccc507b43134a3c0ea88b to your computer and use it in GitHub Desktop.
Foundry Lightweight Transform: SQLFrame (DuckDB) demo for service-level patient analytics This gist shows a minimal, anonymized Foundry lightweight transform that reads two tabular datasets (staff, patients), builds SQLFrame DataFrames on a DuckDB session, computes service-level aggregates, and writes results back to a Foundry output.
from transforms.api import LightweightOutput
from sqlframe.duckdb import DuckDBSession
from sqlframe.duckdb import functions as F
from transforms.api import LightweightInput, transform, Output, Input
@transform.using(
staff=Input("ri.foundry.xxx"),
patients=Input("ri.foundry.xxxx"),
staff_stats=Output("ri.foundry.xxx),
employee_stats=Output("ri.foundry.xxxx"),
)
def analyze_employees(
staff: LightweightInput,
patients: LightweightInput,
staff_stats: LightweightOutput,
employee_stats: LightweightOutput,
):
# Documenttion on the opensource dataset can be found here:
# https://www.kaggle.com/datasets/jaderz/hospital-beds-management?resource=download
# Documentation on SQLFrame can be found here:
# https://github.com/eakmanrq/sqlframe
# Documentation on Foundry Transforms API can be found here:
# https://www.palantir.com/docs/foundry/transforms-python/advanced-compute
# 1. Initialize the SQLFrame DuckDB session
session = DuckDBSession.builder.getOrCreate()
# 2) Read inputs as Arrow (works in lightweight), for development in the lesson
# staff_arrow = staff.arrow() # pyarrow.Table
# patients_arrow = patients.arrow() # pyarrow.Table
# 3) Create SQLFrame DFs (pandas -> SQLFrame). In the lesson we'll attempt to use Arrow
df_staff = session.createDataFrame(staff.pandas())
df_patients = session.createDataFrame(patients.pandas())
# Ensure column types are correct if necessary (SQLFrame infers schema automatically)
df_staff = df_staff.withColumn("staff_id", F.col("staff_id").cast("string"))
df_patients = df_patients.withColumn(
"age", F.col("age").cast("integer")
).withColumn("satisfaction", F.col("satisfaction").cast("integer"))
# 4. Perform an AGGREGATION operation and select specific columns
df_agg_patients = (
df_patients.groupBy("service")
.agg(
F.count("*").alias("total_patients"),
F.avg("age").alias("average_age"),
)
.orderBy(F.col("average_age").desc())
)
# 5. Aggregation with multiple metrics
df_ranked = (
df_patients
.groupBy("service")
.agg(
F.count("*").alias("service_count"),
F.avg("satisfaction").alias("average_satisfaction"), # 0–100
)
)
# Example Join to test more of the Spark API
df_agg_with_rank = df_agg_patients.join(df_ranked, on="service", how="left")
# 6. Write results back to Foundry: Collect results into a Pandas DataFrame and write it
staff_stats.write_table(df_agg_with_rank.toPandas())
# Write ranked employee results: Collect results into an Arrow Table and write it
# arrow_table_employee_stats = df_ranked.to_arrow()
# employee_stats.write_table(arrow_table_employee_stats)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment