Created
September 11, 2024 02:50
-
-
Save cnolanminich/9106cff7e9f5eec1a0f8d43aca6a781e to your computer and use it in GitHub Desktop.
Schedule that only runs if the upstream asset is fresh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dagster as dg | |
from datetime import timedelta | |
# Upstream asset that should be fresh | |
@dg.asset | |
def upstream_asset(context: dg.AssetExecutionContext) -> None: | |
context.log.info("Upstream asset is being computed") | |
@dg.asset | |
def downstream_asset(context: dg.AssetExecutionContext) -> None: | |
context.log.info("downstream asset is being computed") | |
# freshness check on upstream_asset to ensure it's fresh within 2 hours | |
upstream_asset_freshness_checks = dg.build_last_update_freshness_checks( | |
assets=[upstream_asset], | |
lower_bound_delta=timedelta(hours=2), | |
) | |
# sensor to assess freshness every 30 seconds | |
freshness_checks_sensor = dg.build_sensor_for_freshness_checks( | |
freshness_checks=upstream_asset_freshness_checks | |
) | |
scheduled_job = dg.define_asset_job("my_job", selection=[upstream_asset, downstream_asset]) | |
# schedule that runs once a minute. If the upstream asset is not fresh, then the schedule does not run | |
# if the asset is fresh, the schedule for asset a and b will run | |
@dg.schedule(job=scheduled_job, cron_schedule="* * * * *") | |
def schedule_that_checks(context): | |
# Find runs of the same job that are currently running | |
asset_key = dg.AssetKey("upstream_asset") | |
check_key = dg.AssetCheckKey(asset_key=asset_key, name="freshness_check") | |
summary_records = context.instance.event_log_storage.get_asset_check_summary_records([check_key]) | |
context.log.info(f"Summary records: {summary_records}") | |
summary_record = summary_records.get(check_key) | |
last_check_execution_record = summary_record.last_check_execution_record | |
context.log.info(f"Last check execution record: {last_check_execution_record}") | |
if last_check_execution_record and last_check_execution_record.status.value == "FAILED": | |
return dg.SkipReason("Upstream asset is not fresh") | |
return dg.RunRequest() | |
defs = dg.Definitions( | |
assets=[upstream_asset, downstream_asset], | |
asset_checks=upstream_asset_freshness_checks, | |
sensors=[freshness_checks_sensor], | |
schedules = [schedule_that_checks] | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment