Created
October 9, 2025 13:40
-
-
Save adilkhash/6cea7f38c5df8a9c8c13681a9d55292d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| import os | |
| import shutil | |
| from datetime import datetime, timedelta | |
| import pendulum | |
| import structlog | |
| from airflow.sdk import DAG, task | |
| logger = structlog.get_logger(__name__) | |
| @task | |
| def cleanup_airflow_logs(days_to_keep): | |
| log_base_path = os.environ.get("AIRFLOW_HOME", "/opt/airflow") + "/logs" | |
| cutoff_date = datetime.now() - timedelta(days=days_to_keep) | |
| for root, dirs, files in os.walk(log_base_path): | |
| for dir_name in dirs: | |
| dir_path = os.path.join(root, dir_name) | |
| try: | |
| # Assuming directory names might contain date information or modification times | |
| # You'll need more sophisticated logic to determine if a directory is "old" | |
| # based on the content or parent directory names (e.g., dag_id/task_id/execution_date) | |
| if os.path.getmtime(dir_path) < cutoff_date.timestamp(): | |
| logger.info(f"Deleting old log directory: {dir_path}") | |
| shutil.rmtree(dir_path) | |
| except Exception as e: | |
| logger.error(f"Error deleting directory {dir_path}: {e}") | |
| with DAG( | |
| dag_id="airflow_log_cleanup_dag", | |
| start_date=pendulum.datetime(2025, 10, 1, tz="Asia/Almaty"), | |
| schedule="@daily", # Run daily at midnight | |
| catchup=False, | |
| default_args={ | |
| "owner": "airflow", | |
| "retries": 2, | |
| "retry_delay": timedelta(minutes=5), | |
| }, | |
| max_active_runs=1, | |
| ) as dag: | |
| cleanup_airflow_logs(days_to_keep=14) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment