Skip to content

Instantly share code, notes, and snippets.

@wakatara
Created March 18, 2025 03:52
Show Gist options
  • Save wakatara/0cc6e837360a415114c458e160a9a9f3 to your computer and use it in GitHub Desktop.
Save wakatara/0cc6e837360a415114c458e160a9a9f3 to your computer and use it in GitHub Desktop.
# Modified from base airflow.sensors.filesystem
# Author: Daryl Manning <[email protected]>
###############################################
# This sensor looks at a directory or directories
# and builds a list of files with changed
# modification datetimes since last check.
from __future__ import annotations
import os, datetime, json
from glob import glob
from typing import Sequence
from airflow.hooks.filesystem import FSHook
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context
from astropy.io import fits
class FileSensorFileLists(BaseSensorOperator):
# """
# Waits for a file or folder to land in a filesystem then builds
# a list of those files to be acted on by downstream tasks or DAGs.
# If the path given is a directory then this sensor will only return true if
# any files exist inside it (either directly, or within a subdirectory)
# :param fs_conn_id: reference to the File (path)
# connection id
# :param filepath: File or folder name (relative to
# the base path set within the connection), can be a glob.
# :param recursive: when set to ``True``, enables recursive directory matching behavior of
# ``**`` in glob filepath parameter. Defaults to ``False``.
# .. seealso::
# For more information on how to use this sensor, take a look at the guide:
# :ref:`howto/operator:FileSensor`
# """
template_fields: Sequence[str] = ("filepath", "xcom_key",)
ui_color = "#404A8E"
def __init__(self, *, filepath, xcom_key, fs_conn_id="fs_default", recursive=False, **kwargs):
super().__init__(**kwargs)
self.filepath = filepath
self.fs_conn_id = fs_conn_id
self.recursive = recursive
self.xcom_key = xcom_key
def poke(self, context: Context):
hook = FSHook(self.fs_conn_id)
basepath = hook.get_path()
full_path = os.path.join(basepath, self.filepath, "**")
self.log.info("Poking for files in %s", full_path)
new_files=[]
# nb: you *must* have the file path passed to be polled as
# volume in docker-compose or you will get "No new files"
for path in glob(full_path, recursive=self.recursive):
if os.path.isfile(path):
try:
with fits.open(path) as hdulist:
# Check file contains 1+ HDU and an n-axis of 2 for image array
if len(hdulist) != 0 and hdulist[0].header.get('NAXIS') == 2:
mod_time = datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime("%Y-%m-%d %H:%M:%S")
self.log.info("Found File {path} last modified: {mod_time}".format(path=path, mod_time=mod_time))
new_files.append(path)
except Exception as e:
self.log.info(path + " caused an exception when checked for FITs image")
if len(new_files) > 0:
self.log.info("Pushing unprocessed files list to XCOM")
# xcoms data needs to be json serializable
jsonified_filelist = json.dumps(new_files)
context["task_instance"].xcom_push(key=self.xcom_key, value=jsonified_filelist)
return True
else:
self.log.info("No New Files (yet)")
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment