Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save NicolasPA/b3678a0ffb41f9e4b4ffda15fbb2431c to your computer and use it in GitHub Desktop.
Save NicolasPA/b3678a0ffb41f9e4b4ffda15fbb2431c to your computer and use it in GitHub Desktop.
Dagster dynamic asset graph mixing dynamic mapping (one edge for each file to load) and dynamic outputs (a file doesn't go through the same node/function depending on its type) created by factory functions
DAILY_PARTITIONS = DailyPartitionsDefinition(start_date="2022-06-01")
@asset(
description="Files to load",
partitions_def=DAILY_PARTITIONS,
key_prefix="source",
config_schema={
"selected_file_paths": Field(Array(str), is_required=False, default_value=[])
},
)
def source_files(context: OpExecutionContext):
selected_file_paths = context.op_config["selected_file_paths"]
if selected_file_paths:
context.log.info(f"Found selected file paths: {selected_file_paths}")
file_paths = selected_file_paths
else:
context.log.info("Looking for paths matching the pattern.")
file_paths = ["type_1_a", "type_1_b", "type_2_c"]
return Output(file_paths)
@op(
out={
"type_1": DynamicOut(),
"type_2": DynamicOut(),
},
)
def output_files_dynamically(context: OpExecutionContext, source_files):
for file_path in source_files:
if "type_1" in file_path:
yield DynamicOutput(
value=file_path,
output_name="type_1",
mapping_key=file_path,
)
elif "type_2" in file_path:
yield DynamicOutput(
value=file_path,
output_name="type_2",
mapping_key=file_path,
)
else:
context.log.warning(
f"The path pattern of file: {file_path} is not among the supported file path patterns: type_1, type_2."
)
@op
def load_to_table_type_1(context, file_name):
"""Load the file into the table"""
context.log.info(f"Loading to table for file {file_name}")
return file_name
@op
def load_to_table_type_2(context, file_name):
"""Load the file into the table"""
context.log.info(f"Loading to table for file {file_name}")
return file_name
@op(description="Loaded table")
def merge(type1_files: List[str], type2_files: List[str]):
"""Merge all the files"""
@graph(name="table", description="lala")
def load_files_to_table_graph(source_files):
type_1_files, type_2_files = output_files_dynamically(source_files)
processed_type_1_files = type_1_files.map(load_to_table_type_1).collect()
processed_type_2_files = type_2_files.map(load_to_table_type_2).collect()
return merge(processed_type_1_files, processed_type_2_files)
table = AssetsDefinition.from_graph(
load_files_to_table_graph,
partitions_def=DAILY_PARTITIONS,
keys_by_input_name={"source_files": source_files.asset_key},
key_prefix="source",
)
load_files_to_table_job = define_asset_job(
name="load_files_to_table_job",
selection=AssetSelection.assets(source_files, table),
partitions_def=DAILY_PARTITIONS,
)
@sensor(job=load_files_to_table_job)
def new_files_sensor():
# some files detection logic returns this:
new_files_partitions = [
{"partition_date": "2022-11-05", "selected_file_paths": ["d_type_1", "e_type_2", "x_type_2", "y_type_3"]},
{"partition_date": "2022-11-06", "selected_file_paths": ["f_type_1", "g_type_2"]},
]
source_files_asset_key = source_files.key.to_python_identifier()
for new_files_partition in new_files_partitions:
run_config = {
"ops": {
source_files_asset_key: {
"config": {
"selected_file_paths": new_files_partition[
"selected_file_paths"
]
}
}
}
}
yield load_files_to_table_job.run_request_for_partition(
partition_key=new_files_partition["partition_date"], run_config=run_config
)
@repository
def repo():
return [
source_files,
table,
new_files_sensor
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment