Skip to content

Instantly share code, notes, and snippets.

@NicolasPA
Created August 8, 2023 15:46
Show Gist options
  • Save NicolasPA/03391f2de1c6828a3f5528a041d0aa97 to your computer and use it in GitHub Desktop.
Save NicolasPA/03391f2de1c6828a3f5528a041d0aa97 to your computer and use it in GitHub Desktop.
Dagster new files sensors. Detects new files to integrate by computing the difference between files in the source directory and the list of integrated files stored in a table. itt won't trigger any new run if a job is already running, since the output table used to compute the difference is not yet up to date.
import datetime
from dagster import (
sensor,
SensorDefinition,
AssetKey,
SkipReason,
RunsFilter,
DagsterRunStatus,
SensorEvaluationContext,
)
EXISTING_RUNS_STATUSES = [
DagsterRunStatus.QUEUED,
DagsterRunStatus.NOT_STARTED,
DagsterRunStatus.STARTING,
DagsterRunStatus.STARTED,
]
def new_remit_files_sensor_factory(data_flow: str) -> SensorDefinition:
sensor_name = f"new_remit_{data_flow}_files_sensor"
asset_name = f"source_remit_{data_flow}"
sensor_interval = (
config.DATA_FLOWS[data_flow]["minimum_sensor_interval"]
if "minimum_sensor_interval" in config.DATA_FLOWS[data_flow]
else 10 * 60
)
@sensor(
name=sensor_name,
description=f"""
Detect new REMIT {data_flow} files to integrate by computing the difference between files in the source
directory and the list of integrated files stored in the root table of the xml2db output.
The sensor won't trigger any new run if a job is already running so the output table used to compute the
difference is not yet up to date.
""",
job=update_source_remit_job,
minimum_interval_seconds=sensor_interval,
)
def new_remit_files_sensor(context: SensorEvaluationContext):
# Look for current runs triggered by this sensor
existing_runs = context.instance.get_runs(
RunsFilter(
job_name=update_source_remit_job.name,
tags={"dagster/sensor_name": sensor_name}, # only from this sensor
statuses=EXISTING_RUNS_STATUSES,
),
)
# Skip if job triggered by this sensor is already running
if existing_runs:
yield SkipReason(
f"Job '{update_source_remit_job.name}' triggered by sensor '{sensor_name}' is already running."
)
else:
# Directory dates scanned -> maximum delay for new files to be added
files_scan_period_days = 30
scan_end_date = datetime.datetime.today()
scan_start_date = scan_end_date - datetime.timedelta(
days=files_scan_period_days
)
remit_files = runner.find_files(
data_flow=data_flow,
start_date=scan_start_date,
end_date=scan_end_date,
)
engine = MSSQLWarehouse("remit").get_sql_engine()
missing_files = runner.get_not_integrated_files(
data_flow=data_flow,
db_engine=engine,
file_paths=remit_files,
)
missing_partitions = {file_date for file_date, _ in missing_files}
if len(missing_partitions) > 0:
for file_date in missing_partitions:
yield update_source_remit_job.run_request_for_partition(
asset_selection=[
AssetKey(["source", asset_name]),
],
partition_key=file_date.strftime("%Y-%m-%d"),
tags={
"xml2db": "table1"
if data_flow.startswith("table1")
else data_flow
},
)
else:
yield SkipReason("No new files detected.")
return new_remit_files_sensor
new_remit_files_sensors = [
new_remit_files_sensor_factory(data_flow) for data_flow in config.DATA_FLOWS
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment