Skip to content

Instantly share code, notes, and snippets.

@NicolasPA
Created July 31, 2023 10:15
Show Gist options
  • Save NicolasPA/854392e22dc1410977cc7ddb8b8605a4 to your computer and use it in GitHub Desktop.
Save NicolasPA/854392e22dc1410977cc7ddb8b8605a4 to your computer and use it in GitHub Desktop.
Dagster sensor checking if jobs are already running
from dagster import (
RunRequest,
MultiAssetSensorEvaluationContext,
multi_asset_sensor,
AssetSelection,
SkipReason,
RunsFilter,
DagsterRunStatus,
)
EXISTING_RUNS_STATUSES = [
DagsterRunStatus.QUEUED,
DagsterRunStatus.NOT_STARTED,
DagsterRunStatus.STARTING,
DagsterRunStatus.STARTED,
]
@multi_asset_sensor(
description="Trigger my_job2 executions when my_monitored_assets are updated. To avoid stacking my_job2 execution "
"requests everytime my_monitored_assets are updated by my_job1, we wait for my_job1 runs to finish and we check "
"that my_job2 is not already running.",
monitored_assets=AssetSelection.assets(*my_monitored_assets),
job=my_job2,
minimum_interval_seconds=10 * 60,
)
def updated_my_monitored_assets_sensor(
context: MultiAssetSensorEvaluationContext,
):
my_monitored_asset_events = context.latest_materialization_records_by_key()
existing_my_job1_runs = context.instance.get_runs(
RunsFilter(
job_name=my_job1.name,
statuses=EXISTING_RUNS_STATUSES,
),
)
existing_my_job2_runs = context.instance.get_runs(
RunsFilter(
job_name=my_job2.name,
statuses=EXISTING_RUNS_STATUSES,
),
)
if any(my_monitored_asset_events.values()):
# my_monitored_assets were updated, should we run?
if not existing_my_job1_runs and not existing_my_job2_runs:
# Nothing is running so let's run!
context.advance_all_cursors()
return RunRequest()
else:
# Something is running so we don't run!
materialized_asset_keys = [
key.to_user_string()
for key, value in my_monitored_asset_events.items()
if value
]
running_job_names = (
f"{my_job1.name} and {my_job2.name}"
if existing_my_job1_runs and existing_my_job2_runs
else (my_job1.name if existing_my_job1_runs else my_job2.name)
)
return SkipReason(
f"Observed materializations for assets '{materialized_asset_keys}', "
f"but runs have started for job '{running_job_names}'."
)
else:
# my_monitored_assets were not updated so there's no point running.
return SkipReason(
f"No observed materialization for assets '{[asset_key.to_user_string() for asset_key in my_monitored_asset_events]}'"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment