Last active
September 28, 2024 07:35
-
-
Save Per48edjes/770f8c8c506bc2d8ccba9dd038db30c8 to your computer and use it in GitHub Desktop.
Attempt at distributed a workload efficiently to achieve incremental transforms without ever listing S3 contents
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import reduce | |
from transforms.api import transform_df, Input, Output | |
@transform_df( | |
Output("/data_project/manifests/trp_synced_paths_manifest"), | |
incremental_manifest=Output("/data_project/manifests/trp_incremental_manifest"), | |
synced_paths=Input("/data_project/manifests/trp_synced_paths_manifest"), | |
bucket_data=Input("/data_project/input/s3_bucket_data_source") # Data source for your bucket content | |
) | |
def incremental_sync_and_update(ctx, synced_paths, bucket_data): | |
# Use Foundry-provided Spark session | |
spark = ctx.spark_session | |
# Load UUIDs from the bucket_data DataFrame | |
uuids_df = bucket_data.selectExpr("split(path, '/')[0] as uuid").distinct() | |
uuids = [row["uuid"] for row in uuids_df.collect()] | |
# Load the previously synced paths dataset | |
synced_paths_df = synced_paths | |
# Create a DataFrame of all UUIDs (avoiding redundant entries) | |
all_uuids_df = spark.createDataFrame([(uuid,) for uuid in uuids], ["uuid"]) | |
# Extract distinct UUIDs from the synced paths DataFrame | |
synced_paths_uuids_df = synced_paths_df.selectExpr("split(path, '/')[0] as uuid").distinct() | |
# Determine the unsynced UUIDs using a left anti-join | |
unsynced_uuids_df = all_uuids_df.join(synced_paths_uuids_df, on="uuid", how="left_anti") | |
unsynced_uuids = [row["uuid"] for row in unsynced_uuids_df.collect()] | |
# Parallelize the UUIDs for distributed processing | |
unsynced_uuids_rdd = spark.sparkContext.parallelize(unsynced_uuids) | |
# Function to load and filter paths for each UUID in a distributed manner | |
def load_and_filter_paths(uuid): | |
prefix_path = f"s3a://your-bucket-name/{uuid}/" | |
df = spark.read.format("binaryFile").load(prefix_path) | |
filtered_df = df.filter(df.path.rlike(r".*/[0-9]+/tracks/.*")) | |
return filtered_df | |
# Use map to load and filter paths for each UUID in parallel | |
filtered_dfs = unsynced_uuids_rdd.map(load_and_filter_paths).collect() | |
# Combine all filtered DataFrames into a single DataFrame in a distributed way | |
if filtered_dfs: | |
new_files_df = reduce(lambda df1, df2: df1.union(df2), filtered_dfs) | |
else: | |
new_files_df = spark.createDataFrame([], schema=synced_paths_df.schema) | |
# Perform a Left Anti-Join to Remove Already Synced Paths | |
incremental_files_df = new_files_df.join(synced_paths_df, on="path", how="left_anti") | |
# Output the incremental manifest as a Foundry dataset | |
incremental_manifest = incremental_files_df.select("path") | |
# Append new paths to the synced paths DataFrame | |
updated_synced_paths_df = synced_paths_df.union(incremental_manifest).dropDuplicates(["path"]) | |
# Output both updated synced paths and the incremental manifest as Foundry datasets | |
return updated_synced_paths_df, incremental_manifest |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
...aka an experiment with prompt engineering...