Created
August 8, 2023 15:46
-
-
Save NicolasPA/03391f2de1c6828a3f5528a041d0aa97 to your computer and use it in GitHub Desktop.
Dagster new files sensors. Detects new files to integrate by computing the difference between files in the source directory and the list of integrated files stored in a table. itt won't trigger any new run if a job is already running, since the output table used to compute the difference is not yet up to date.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
from dagster import ( | |
sensor, | |
SensorDefinition, | |
AssetKey, | |
SkipReason, | |
RunsFilter, | |
DagsterRunStatus, | |
SensorEvaluationContext, | |
) | |
EXISTING_RUNS_STATUSES = [ | |
DagsterRunStatus.QUEUED, | |
DagsterRunStatus.NOT_STARTED, | |
DagsterRunStatus.STARTING, | |
DagsterRunStatus.STARTED, | |
] | |
def new_remit_files_sensor_factory(data_flow: str) -> SensorDefinition: | |
sensor_name = f"new_remit_{data_flow}_files_sensor" | |
asset_name = f"source_remit_{data_flow}" | |
sensor_interval = ( | |
config.DATA_FLOWS[data_flow]["minimum_sensor_interval"] | |
if "minimum_sensor_interval" in config.DATA_FLOWS[data_flow] | |
else 10 * 60 | |
) | |
@sensor( | |
name=sensor_name, | |
description=f""" | |
Detect new REMIT {data_flow} files to integrate by computing the difference between files in the source | |
directory and the list of integrated files stored in the root table of the xml2db output. | |
The sensor won't trigger any new run if a job is already running so the output table used to compute the | |
difference is not yet up to date. | |
""", | |
job=update_source_remit_job, | |
minimum_interval_seconds=sensor_interval, | |
) | |
def new_remit_files_sensor(context: SensorEvaluationContext): | |
# Look for current runs triggered by this sensor | |
existing_runs = context.instance.get_runs( | |
RunsFilter( | |
job_name=update_source_remit_job.name, | |
tags={"dagster/sensor_name": sensor_name}, # only from this sensor | |
statuses=EXISTING_RUNS_STATUSES, | |
), | |
) | |
# Skip if job triggered by this sensor is already running | |
if existing_runs: | |
yield SkipReason( | |
f"Job '{update_source_remit_job.name}' triggered by sensor '{sensor_name}' is already running." | |
) | |
else: | |
# Directory dates scanned -> maximum delay for new files to be added | |
files_scan_period_days = 30 | |
scan_end_date = datetime.datetime.today() | |
scan_start_date = scan_end_date - datetime.timedelta( | |
days=files_scan_period_days | |
) | |
remit_files = runner.find_files( | |
data_flow=data_flow, | |
start_date=scan_start_date, | |
end_date=scan_end_date, | |
) | |
engine = MSSQLWarehouse("remit").get_sql_engine() | |
missing_files = runner.get_not_integrated_files( | |
data_flow=data_flow, | |
db_engine=engine, | |
file_paths=remit_files, | |
) | |
missing_partitions = {file_date for file_date, _ in missing_files} | |
if len(missing_partitions) > 0: | |
for file_date in missing_partitions: | |
yield update_source_remit_job.run_request_for_partition( | |
asset_selection=[ | |
AssetKey(["source", asset_name]), | |
], | |
partition_key=file_date.strftime("%Y-%m-%d"), | |
tags={ | |
"xml2db": "table1" | |
if data_flow.startswith("table1") | |
else data_flow | |
}, | |
) | |
else: | |
yield SkipReason("No new files detected.") | |
return new_remit_files_sensor | |
new_remit_files_sensors = [ | |
new_remit_files_sensor_factory(data_flow) for data_flow in config.DATA_FLOWS | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment