|
from dagster import asset, AssetExecutionContext, DailyPartitionsDefinition, Definitions, define_asset_job, AssetSelection, AssetKey, ScheduleDefinition, AutoMaterializePolicy, AutoMaterializeRule |
|
|
|
|
|
# a partition definition is optional but allows us to "parameterize" |
|
# the pipelines by date for doing incremental runs & backfills |
|
|
|
daily_partitions = DailyPartitionsDefinition(start_date="2024-5-01") |
|
|
|
# you can create assets in-line |
|
@asset( |
|
group_name="SAP", |
|
key_prefix="SAP", |
|
partitions_def=daily_partitions |
|
) |
|
def a_table(context: AssetExecutionContext): |
|
context.log.info("Some information logged during execution") |
|
|
|
# do the work to update the SAP table for this partition |
|
context.partition_key |
|
|
|
context.add_output_metadata({ |
|
"some_key_for_catalog": "some_execution_time_value" |
|
}) |
|
|
|
return |
|
|
|
@asset( |
|
group_name="SAP", |
|
key_prefix="SAP", |
|
partitions_def=daily_partitions |
|
) |
|
def b_table(context: AssetExecutionContext): |
|
context.log.info("Some information logged during execution") |
|
|
|
# do the work to update the SAP table for this partition |
|
context.partition_key |
|
|
|
context.add_output_metadata({ |
|
"some_key_for_catalog": "some_execution_time_value" |
|
}) |
|
|
|
return |
|
|
|
# or create assets more dynamnically using a factory approach |
|
def create_alegro_producers(tables_to_produce): |
|
assets_to_create = [] |
|
|
|
for table in tables_to_produce: |
|
@asset( |
|
group_name="alegro", |
|
key_prefix="alegro", |
|
partitions_def=daily_partitions, |
|
name=table |
|
) |
|
def alegro_producer(context: AssetExecutionContext): |
|
context.log.info(f"Some information logged during execution of {table}") |
|
# ... |
|
return |
|
|
|
assets_to_create.append(alegro_producer) |
|
|
|
return assets_to_create |
|
|
|
alegro_assets = create_alegro_producers(["c_table", "c_prime_table"]) |
|
|
|
# create jobs & schedules for either a group of producers or individual producers |
|
# sensors could also be used if you want jobs to run after external events instead |
|
# of on cron |
|
|
|
sap_extract_job = define_asset_job( |
|
name="sap_extract_job", |
|
selection=AssetSelection.groups("SAP") |
|
) |
|
|
|
sap_extract_daily = ScheduleDefinition( |
|
name="sap_extract_daily", |
|
job=sap_extract_job, |
|
cron_schedule="0 8 * * *" |
|
) |
|
|
|
alegro_extract_job_c = define_asset_job( |
|
name="alegro_extract_job_c", |
|
selection=AssetSelection.assets(AssetKey(["alegro", "c_table"])) |
|
) |
|
|
|
alegro_extract_daily = ScheduleDefinition( |
|
name="alegro_extract_daily", |
|
job=alegro_extract_job_c, |
|
cron_schedule="30 9 * * *", |
|
) |
|
|
|
alegro_extract_job_c_prime = define_asset_job( |
|
name="alegro_extract_job_c_prime", |
|
selection=AssetSelection.assets(AssetKey(["alegro", "c_prime_table"])) |
|
) |
|
|
|
alegro_extract_hourly = ScheduleDefinition( |
|
name="alegro_extract_hourly", |
|
job=alegro_extract_job_c_prime, |
|
cron_schedule="0 * * * *", |
|
) |
|
|
|
# create consumers, typically the granularity remains 1 asset per table |
|
|
|
@asset( |
|
group_name="staged", |
|
key_prefix="staged", |
|
# specify the upstreams |
|
deps=[AssetKey(["SAP", "a_table"]), AssetKey(["SAP", "b_table"]), AssetKey(["alegro", "c_prime_table"])], |
|
# and the scheduling condition, this one propagates changes of any upstream ASAP |
|
auto_materialize_policy=AutoMaterializePolicy.eager() |
|
) |
|
def consumer_d(context: AssetExecutionContext): |
|
#... |
|
return |
|
|
|
@asset( |
|
group_name="staged", |
|
key_prefix="staged", |
|
# specify the upstreams |
|
deps=[AssetKey(["SAP", "a_table"]), AssetKey(["alegro", "c_table"])], |
|
# and the scheduling condition, this one propagates changes but with some different constraints |
|
auto_materialize_policy=AutoMaterializePolicy.eager().with_rules( |
|
AutoMaterializeRule.skip_on_not_all_parents_updated() |
|
) |
|
) |
|
def consumer_f(context: AssetExecutionContext): |
|
#... |
|
return |
|
|
|
|
|
|
|
defs = Definitions( |
|
assets=[a_table, b_table, *alegro_assets, consumer_d, consumer_f, consumer_g], |
|
jobs=[sap_extract_job, alegro_extract_job_c, alegro_extract_job_c_prime], |
|
schedules=[sap_extract_daily, alegro_extract_daily, alegro_extract_hourly], |
|
) |