Created
August 8, 2023 15:43
-
-
Save NicolasPA/b3678a0ffb41f9e4b4ffda15fbb2431c to your computer and use it in GitHub Desktop.
Dagster dynamic asset graph mixing dynamic mapping (one edge for each file to load) and dynamic outputs (a file doesn't go through the same node/function depending on its type) created by factory functions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DAILY_PARTITIONS = DailyPartitionsDefinition(start_date="2022-06-01") | |
@asset( | |
description="Files to load", | |
partitions_def=DAILY_PARTITIONS, | |
key_prefix="source", | |
config_schema={ | |
"selected_file_paths": Field(Array(str), is_required=False, default_value=[]) | |
}, | |
) | |
def source_files(context: OpExecutionContext): | |
selected_file_paths = context.op_config["selected_file_paths"] | |
if selected_file_paths: | |
context.log.info(f"Found selected file paths: {selected_file_paths}") | |
file_paths = selected_file_paths | |
else: | |
context.log.info("Looking for paths matching the pattern.") | |
file_paths = ["type_1_a", "type_1_b", "type_2_c"] | |
return Output(file_paths) | |
@op( | |
out={ | |
"type_1": DynamicOut(), | |
"type_2": DynamicOut(), | |
}, | |
) | |
def output_files_dynamically(context: OpExecutionContext, source_files): | |
for file_path in source_files: | |
if "type_1" in file_path: | |
yield DynamicOutput( | |
value=file_path, | |
output_name="type_1", | |
mapping_key=file_path, | |
) | |
elif "type_2" in file_path: | |
yield DynamicOutput( | |
value=file_path, | |
output_name="type_2", | |
mapping_key=file_path, | |
) | |
else: | |
context.log.warning( | |
f"The path pattern of file: {file_path} is not among the supported file path patterns: type_1, type_2." | |
) | |
@op | |
def load_to_table_type_1(context, file_name): | |
"""Load the file into the table""" | |
context.log.info(f"Loading to table for file {file_name}") | |
return file_name | |
@op | |
def load_to_table_type_2(context, file_name): | |
"""Load the file into the table""" | |
context.log.info(f"Loading to table for file {file_name}") | |
return file_name | |
@op(description="Loaded table") | |
def merge(type1_files: List[str], type2_files: List[str]): | |
"""Merge all the files""" | |
@graph(name="table", description="lala") | |
def load_files_to_table_graph(source_files): | |
type_1_files, type_2_files = output_files_dynamically(source_files) | |
processed_type_1_files = type_1_files.map(load_to_table_type_1).collect() | |
processed_type_2_files = type_2_files.map(load_to_table_type_2).collect() | |
return merge(processed_type_1_files, processed_type_2_files) | |
table = AssetsDefinition.from_graph( | |
load_files_to_table_graph, | |
partitions_def=DAILY_PARTITIONS, | |
keys_by_input_name={"source_files": source_files.asset_key}, | |
key_prefix="source", | |
) | |
load_files_to_table_job = define_asset_job( | |
name="load_files_to_table_job", | |
selection=AssetSelection.assets(source_files, table), | |
partitions_def=DAILY_PARTITIONS, | |
) | |
@sensor(job=load_files_to_table_job) | |
def new_files_sensor(): | |
# some files detection logic returns this: | |
new_files_partitions = [ | |
{"partition_date": "2022-11-05", "selected_file_paths": ["d_type_1", "e_type_2", "x_type_2", "y_type_3"]}, | |
{"partition_date": "2022-11-06", "selected_file_paths": ["f_type_1", "g_type_2"]}, | |
] | |
source_files_asset_key = source_files.key.to_python_identifier() | |
for new_files_partition in new_files_partitions: | |
run_config = { | |
"ops": { | |
source_files_asset_key: { | |
"config": { | |
"selected_file_paths": new_files_partition[ | |
"selected_file_paths" | |
] | |
} | |
} | |
} | |
} | |
yield load_files_to_table_job.run_request_for_partition( | |
partition_key=new_files_partition["partition_date"], run_config=run_config | |
) | |
@repository | |
def repo(): | |
return [ | |
source_files, | |
table, | |
new_files_sensor | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment