Skip to content

Instantly share code, notes, and snippets.

@Per48edjes
Last active September 28, 2024 07:35
Show Gist options
  • Save Per48edjes/770f8c8c506bc2d8ccba9dd038db30c8 to your computer and use it in GitHub Desktop.
Save Per48edjes/770f8c8c506bc2d8ccba9dd038db30c8 to your computer and use it in GitHub Desktop.
Attempt at distributed a workload efficiently to achieve incremental transforms without ever listing S3 contents
from functools import reduce
from transforms.api import transform_df, Input, Output
@transform_df(
Output("/data_project/manifests/trp_synced_paths_manifest"),
incremental_manifest=Output("/data_project/manifests/trp_incremental_manifest"),
synced_paths=Input("/data_project/manifests/trp_synced_paths_manifest"),
bucket_data=Input("/data_project/input/s3_bucket_data_source") # Data source for your bucket content
)
def incremental_sync_and_update(ctx, synced_paths, bucket_data):
# Use Foundry-provided Spark session
spark = ctx.spark_session
# Load UUIDs from the bucket_data DataFrame
uuids_df = bucket_data.selectExpr("split(path, '/')[0] as uuid").distinct()
uuids = [row["uuid"] for row in uuids_df.collect()]
# Load the previously synced paths dataset
synced_paths_df = synced_paths
# Create a DataFrame of all UUIDs (avoiding redundant entries)
all_uuids_df = spark.createDataFrame([(uuid,) for uuid in uuids], ["uuid"])
# Extract distinct UUIDs from the synced paths DataFrame
synced_paths_uuids_df = synced_paths_df.selectExpr("split(path, '/')[0] as uuid").distinct()
# Determine the unsynced UUIDs using a left anti-join
unsynced_uuids_df = all_uuids_df.join(synced_paths_uuids_df, on="uuid", how="left_anti")
unsynced_uuids = [row["uuid"] for row in unsynced_uuids_df.collect()]
# Parallelize the UUIDs for distributed processing
unsynced_uuids_rdd = spark.sparkContext.parallelize(unsynced_uuids)
# Function to load and filter paths for each UUID in a distributed manner
def load_and_filter_paths(uuid):
prefix_path = f"s3a://your-bucket-name/{uuid}/"
df = spark.read.format("binaryFile").load(prefix_path)
filtered_df = df.filter(df.path.rlike(r".*/[0-9]+/tracks/.*"))
return filtered_df
# Use map to load and filter paths for each UUID in parallel
filtered_dfs = unsynced_uuids_rdd.map(load_and_filter_paths).collect()
# Combine all filtered DataFrames into a single DataFrame in a distributed way
if filtered_dfs:
new_files_df = reduce(lambda df1, df2: df1.union(df2), filtered_dfs)
else:
new_files_df = spark.createDataFrame([], schema=synced_paths_df.schema)
# Perform a Left Anti-Join to Remove Already Synced Paths
incremental_files_df = new_files_df.join(synced_paths_df, on="path", how="left_anti")
# Output the incremental manifest as a Foundry dataset
incremental_manifest = incremental_files_df.select("path")
# Append new paths to the synced paths DataFrame
updated_synced_paths_df = synced_paths_df.union(incremental_manifest).dropDuplicates(["path"])
# Output both updated synced paths and the incremental manifest as Foundry datasets
return updated_synced_paths_df, incremental_manifest
@Per48edjes
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment