Created
March 18, 2025 03:52
-
-
Save wakatara/0cc6e837360a415114c458e160a9a9f3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Modified from base airflow.sensors.filesystem | |
# Author: Daryl Manning <[email protected]> | |
############################################### | |
# This sensor looks at a directory or directories | |
# and builds a list of files with changed | |
# modification datetimes since last check. | |
from __future__ import annotations | |
import os, datetime, json | |
from glob import glob | |
from typing import Sequence | |
from airflow.hooks.filesystem import FSHook | |
from airflow.sensors.base import BaseSensorOperator | |
from airflow.utils.context import Context | |
from astropy.io import fits | |
class FileSensorFileLists(BaseSensorOperator): | |
# """ | |
# Waits for a file or folder to land in a filesystem then builds | |
# a list of those files to be acted on by downstream tasks or DAGs. | |
# If the path given is a directory then this sensor will only return true if | |
# any files exist inside it (either directly, or within a subdirectory) | |
# :param fs_conn_id: reference to the File (path) | |
# connection id | |
# :param filepath: File or folder name (relative to | |
# the base path set within the connection), can be a glob. | |
# :param recursive: when set to ``True``, enables recursive directory matching behavior of | |
# ``**`` in glob filepath parameter. Defaults to ``False``. | |
# .. seealso:: | |
# For more information on how to use this sensor, take a look at the guide: | |
# :ref:`howto/operator:FileSensor` | |
# """ | |
template_fields: Sequence[str] = ("filepath", "xcom_key",) | |
ui_color = "#404A8E" | |
def __init__(self, *, filepath, xcom_key, fs_conn_id="fs_default", recursive=False, **kwargs): | |
super().__init__(**kwargs) | |
self.filepath = filepath | |
self.fs_conn_id = fs_conn_id | |
self.recursive = recursive | |
self.xcom_key = xcom_key | |
def poke(self, context: Context): | |
hook = FSHook(self.fs_conn_id) | |
basepath = hook.get_path() | |
full_path = os.path.join(basepath, self.filepath, "**") | |
self.log.info("Poking for files in %s", full_path) | |
new_files=[] | |
# nb: you *must* have the file path passed to be polled as | |
# volume in docker-compose or you will get "No new files" | |
for path in glob(full_path, recursive=self.recursive): | |
if os.path.isfile(path): | |
try: | |
with fits.open(path) as hdulist: | |
# Check file contains 1+ HDU and an n-axis of 2 for image array | |
if len(hdulist) != 0 and hdulist[0].header.get('NAXIS') == 2: | |
mod_time = datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime("%Y-%m-%d %H:%M:%S") | |
self.log.info("Found File {path} last modified: {mod_time}".format(path=path, mod_time=mod_time)) | |
new_files.append(path) | |
except Exception as e: | |
self.log.info(path + " caused an exception when checked for FITs image") | |
if len(new_files) > 0: | |
self.log.info("Pushing unprocessed files list to XCOM") | |
# xcoms data needs to be json serializable | |
jsonified_filelist = json.dumps(new_files) | |
context["task_instance"].xcom_push(key=self.xcom_key, value=jsonified_filelist) | |
return True | |
else: | |
self.log.info("No New Files (yet)") | |
return False | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment