Created
July 31, 2023 10:15
-
-
Save NicolasPA/854392e22dc1410977cc7ddb8b8605a4 to your computer and use it in GitHub Desktop.
Dagster sensor checking if jobs are already running
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import ( | |
RunRequest, | |
MultiAssetSensorEvaluationContext, | |
multi_asset_sensor, | |
AssetSelection, | |
SkipReason, | |
RunsFilter, | |
DagsterRunStatus, | |
) | |
EXISTING_RUNS_STATUSES = [ | |
DagsterRunStatus.QUEUED, | |
DagsterRunStatus.NOT_STARTED, | |
DagsterRunStatus.STARTING, | |
DagsterRunStatus.STARTED, | |
] | |
@multi_asset_sensor( | |
description="Trigger my_job2 executions when my_monitored_assets are updated. To avoid stacking my_job2 execution " | |
"requests everytime my_monitored_assets are updated by my_job1, we wait for my_job1 runs to finish and we check " | |
"that my_job2 is not already running.", | |
monitored_assets=AssetSelection.assets(*my_monitored_assets), | |
job=my_job2, | |
minimum_interval_seconds=10 * 60, | |
) | |
def updated_my_monitored_assets_sensor( | |
context: MultiAssetSensorEvaluationContext, | |
): | |
my_monitored_asset_events = context.latest_materialization_records_by_key() | |
existing_my_job1_runs = context.instance.get_runs( | |
RunsFilter( | |
job_name=my_job1.name, | |
statuses=EXISTING_RUNS_STATUSES, | |
), | |
) | |
existing_my_job2_runs = context.instance.get_runs( | |
RunsFilter( | |
job_name=my_job2.name, | |
statuses=EXISTING_RUNS_STATUSES, | |
), | |
) | |
if any(my_monitored_asset_events.values()): | |
# my_monitored_assets were updated, should we run? | |
if not existing_my_job1_runs and not existing_my_job2_runs: | |
# Nothing is running so let's run! | |
context.advance_all_cursors() | |
return RunRequest() | |
else: | |
# Something is running so we don't run! | |
materialized_asset_keys = [ | |
key.to_user_string() | |
for key, value in my_monitored_asset_events.items() | |
if value | |
] | |
running_job_names = ( | |
f"{my_job1.name} and {my_job2.name}" | |
if existing_my_job1_runs and existing_my_job2_runs | |
else (my_job1.name if existing_my_job1_runs else my_job2.name) | |
) | |
return SkipReason( | |
f"Observed materializations for assets '{materialized_asset_keys}', " | |
f"but runs have started for job '{running_job_names}'." | |
) | |
else: | |
# my_monitored_assets were not updated so there's no point running. | |
return SkipReason( | |
f"No observed materialization for assets '{[asset_key.to_user_string() for asset_key in my_monitored_asset_events]}'" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment