Created
August 8, 2023 15:50
-
-
Save NicolasPA/5e673242098161f191bff840e4df8e5d to your computer and use it in GitHub Desktop.
Dagster sensor that watches the refresh of the source table assets and then triggers the job that runs the DBT transformations. It avoids stacking run requests by checking what's already running.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import ( | |
RunRequest, | |
MultiAssetSensorEvaluationContext, | |
multi_asset_sensor, | |
AssetSelection, | |
SkipReason, | |
RunsFilter, | |
DagsterRunStatus, | |
) | |
@multi_asset_sensor( | |
description="Trigger DBT REMIT models refresh when REMIT source tables are updated. To avoid stacking DBT refresh " | |
"requests everytime source tables are updated, we wait for the REMIT source tables update job runs to finish and " | |
"we check that the DBT REMIT job is not already running.", | |
monitored_assets=AssetSelection.assets(*source_remit_assets), | |
job=materialize_dbt_remit_job, | |
minimum_interval_seconds=10 * 60, | |
) | |
def updated_source_remit_tables_sensor( | |
context: MultiAssetSensorEvaluationContext, | |
): | |
asset_events = context.latest_materialization_records_by_key() | |
existing_source_remit_update_runs = context.instance.get_runs( | |
RunsFilter( | |
job_name=update_source_remit_job.name, | |
statuses=EXISTING_RUNS_STATUSES, | |
), | |
) | |
existing_dbt_remit_runs = context.instance.get_runs( | |
RunsFilter( | |
job_name=materialize_dbt_remit_job.name, | |
statuses=EXISTING_RUNS_STATUSES, | |
), | |
) | |
# Check source REMIT asset update | |
if any(asset_events.values()): | |
# Check REMIT source tables finished updating and DBT REMIT job is not already running | |
if not existing_source_remit_update_runs and not existing_dbt_remit_runs: | |
context.advance_all_cursors() | |
return RunRequest() | |
else: | |
materialized_asset_keys = [ | |
key.to_user_string() for key, value in asset_events.items() if value | |
] | |
running_job_names = ( | |
f"{update_source_remit_job.name} and {materialize_dbt_remit_job.name}" | |
if existing_source_remit_update_runs and existing_dbt_remit_runs | |
else ( | |
update_source_remit_job.name | |
if existing_source_remit_update_runs | |
else materialize_dbt_remit_job.name | |
) | |
) | |
return SkipReason( | |
f"Observed materializations for assets '{materialized_asset_keys}', " | |
f"but runs have started for job '{running_job_names}'." | |
) | |
else: | |
return SkipReason( | |
f"No observed materialization for assets '{[asset_key.to_user_string() for asset_key in asset_events]}'" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment