Skip to content

Instantly share code, notes, and snippets.

@adilkhash
Created October 9, 2025 13:40
Show Gist options
  • Save adilkhash/6cea7f38c5df8a9c8c13681a9d55292d to your computer and use it in GitHub Desktop.
Save adilkhash/6cea7f38c5df8a9c8c13681a9d55292d to your computer and use it in GitHub Desktop.
import logging
import os
import shutil
from datetime import datetime, timedelta
import pendulum
import structlog
from airflow.sdk import DAG, task
logger = structlog.get_logger(__name__)
@task
def cleanup_airflow_logs(days_to_keep):
log_base_path = os.environ.get("AIRFLOW_HOME", "/opt/airflow") + "/logs"
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
for root, dirs, files in os.walk(log_base_path):
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
try:
# Assuming directory names might contain date information or modification times
# You'll need more sophisticated logic to determine if a directory is "old"
# based on the content or parent directory names (e.g., dag_id/task_id/execution_date)
if os.path.getmtime(dir_path) < cutoff_date.timestamp():
logger.info(f"Deleting old log directory: {dir_path}")
shutil.rmtree(dir_path)
except Exception as e:
logger.error(f"Error deleting directory {dir_path}: {e}")
with DAG(
dag_id="airflow_log_cleanup_dag",
start_date=pendulum.datetime(2025, 10, 1, tz="Asia/Almaty"),
schedule="@daily", # Run daily at midnight
catchup=False,
default_args={
"owner": "airflow",
"retries": 2,
"retry_delay": timedelta(minutes=5),
},
max_active_runs=1,
) as dag:
cleanup_airflow_logs(days_to_keep=14)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment