Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save MikeRzhevsky/28af08afc48ff5ff13f9eded69be91b7 to your computer and use it in GitHub Desktop.
Save MikeRzhevsky/28af08afc48ff5ff13f9eded69be91b7 to your computer and use it in GitHub Desktop.
from sched import scheduler
from dagster import job, op, Failure, sensor, RunRequest,schedule,SkipReason
from .dbt_cube import dbt_cube_and_exposure_incremental
from dagster.core.storage.pipeline_run import (
RunsFilter,
PipelineRunStatus,
)
import time
import datetime
#@schedule(cron_schedule="0 13-22 * * *",job=dbt_cube_and_exposure_incremental,execution_timezone="Europe/Moscow")
#def dbt_cube_and_exposure_incremental_sensor(context):
@sensor(job=dbt_cube_and_exposure_incremental,minimum_interval_seconds=60)
def dbt_cube_and_exposure_incremental_sensor(context):
now = datetime.datetime.now().time()
run_records = context.instance.get_run_records(
RunsFilter(
job_name = 'dbt_cube_and_exposure_incremental',
statuses=[PipelineRunStatus.STARTED],
)
)
if (now > datetime.time(17) and now < datetime.time(22)):
if (len(run_records) == 0):
yield RunRequest(
run_key=None,
job_name='dbt_cube_and_exposure_incremental',
)
else:
yield SkipReason(f"Задание работает")
else:
yield SkipReason(f"Не тот интервал")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment