Skip to content

Instantly share code, notes, and snippets.

@NicolasPA
Created August 8, 2023 15:50
Show Gist options
  • Save NicolasPA/5e673242098161f191bff840e4df8e5d to your computer and use it in GitHub Desktop.
Save NicolasPA/5e673242098161f191bff840e4df8e5d to your computer and use it in GitHub Desktop.
Dagster sensor that watches the refresh of the source table assets and then triggers the job that runs the DBT transformations. It avoids stacking run requests by checking what's already running.
from dagster import (
RunRequest,
MultiAssetSensorEvaluationContext,
multi_asset_sensor,
AssetSelection,
SkipReason,
RunsFilter,
DagsterRunStatus,
)
@multi_asset_sensor(
description="Trigger DBT REMIT models refresh when REMIT source tables are updated. To avoid stacking DBT refresh "
"requests everytime source tables are updated, we wait for the REMIT source tables update job runs to finish and "
"we check that the DBT REMIT job is not already running.",
monitored_assets=AssetSelection.assets(*source_remit_assets),
job=materialize_dbt_remit_job,
minimum_interval_seconds=10 * 60,
)
def updated_source_remit_tables_sensor(
context: MultiAssetSensorEvaluationContext,
):
asset_events = context.latest_materialization_records_by_key()
existing_source_remit_update_runs = context.instance.get_runs(
RunsFilter(
job_name=update_source_remit_job.name,
statuses=EXISTING_RUNS_STATUSES,
),
)
existing_dbt_remit_runs = context.instance.get_runs(
RunsFilter(
job_name=materialize_dbt_remit_job.name,
statuses=EXISTING_RUNS_STATUSES,
),
)
# Check source REMIT asset update
if any(asset_events.values()):
# Check REMIT source tables finished updating and DBT REMIT job is not already running
if not existing_source_remit_update_runs and not existing_dbt_remit_runs:
context.advance_all_cursors()
return RunRequest()
else:
materialized_asset_keys = [
key.to_user_string() for key, value in asset_events.items() if value
]
running_job_names = (
f"{update_source_remit_job.name} and {materialize_dbt_remit_job.name}"
if existing_source_remit_update_runs and existing_dbt_remit_runs
else (
update_source_remit_job.name
if existing_source_remit_update_runs
else materialize_dbt_remit_job.name
)
)
return SkipReason(
f"Observed materializations for assets '{materialized_asset_keys}', "
f"but runs have started for job '{running_job_names}'."
)
else:
return SkipReason(
f"No observed materialization for assets '{[asset_key.to_user_string() for asset_key in asset_events]}'"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment