Created
July 14, 2021 05:54
-
-
Save kushsharma/a6a4a7c66acf4b54e84c6a45008bb1f6 to your computer and use it in GitHub Desktop.
Airflow DAG for automatic scheduler steroids injection.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
DAG for automatic scheduler steroids injection. | |
This fixes Airflow scheduler leaving dag runs in running state although | |
tasks are completed successfully. | |
This runs every day and refresh last 7 days of dags but if you want to run | |
on ad-hoc basis refresh older dags, use manual trigger and pass start_range | |
as a date like '2021-01-01T00:00:00Z' | |
''' | |
from typing import Any, Callable, Dict, Optional | |
from datetime import datetime, timedelta, timezone | |
import logging | |
from airflow.models import DAG, Variable, DagRun, DagModel, TaskInstance, DagBag | |
from airflow.configuration import conf | |
from airflow.utils.state import State | |
from airflow.utils.weight_rule import WeightRule | |
from airflow.operators.python import PythonOperator | |
from airflow.settings import TIMEZONE | |
DAG_RETRY_DELAY = int(Variable.get("dag_retry_delay_in_secs", default_var=5 * 60)) | |
default_empty_date = "0000-00-00T00:00:00Z" | |
default_args = { | |
"params": { | |
"start_range": default_empty_date, | |
"refresh_failed": "false" | |
}, | |
"owner": "optimus", | |
"depends_on_past": False, | |
"retries": 0, | |
"retry_delay": timedelta(seconds=DAG_RETRY_DELAY), | |
"retry_exponential_backoff": False, | |
"priority_weight": 10000, | |
"start_date": datetime.strptime("2021-07-11T00:00:00", "%Y-%m-%dT%H:%M:%S"), | |
} | |
dag = DAG( | |
dag_id="__dag_status_refresh", | |
default_args=default_args, | |
schedule_interval="0 12 * * *", | |
max_active_runs=1, | |
) | |
def get_dagbag(): | |
dagbag = DagBag(read_dags_from_db=True) | |
logging.info("loading dags from db") | |
dagbag.collect_dags_from_db() | |
return dagbag | |
def inject(**kwargs): | |
start_range = datetime.now() - timedelta(days=7) # default 7 day range | |
if kwargs["params"]["start_range"] is not None and kwargs["params"]["start_range"] != "" and kwargs["params"]["start_range"] != default_empty_date: | |
start_range = datetime.strptime(kwargs["params"]["start_range"], "%Y-%m-%dT%H:%M:%SZ") | |
start_range = start_range.replace(tzinfo=TIMEZONE) | |
logging.info("injecting success steroids from: {}".format(start_range)) | |
status_updated = 0 | |
dag_bag = get_dagbag() | |
# check for state change in running runs | |
for dag_id in dag_bag.dag_ids: | |
dag = dag_bag.get_dag(dag_id) | |
if dag is None: | |
raise Exception("invalid dag id") | |
logging.info("check for dag run in running state: {}".format(dag.dag_id)) | |
dag_runs = DagRun.find(dag_id=dag_id, state=State.RUNNING, execution_start_date=start_range) | |
for dag_run in dag_runs: | |
logging.info("dag run {} state on {} is in {} state".format(dag_run.dag_id, dag_run.execution_date, dag_run.get_state())) | |
if dag_run.get_state() == State.RUNNING: | |
dag_run.dag = dag | |
dag_run.update_state() | |
if dag_run.get_state() == State.SUCCESS: | |
status_updated += 1 | |
logging.info("dag run {} state on {} was in running state, after update it is in: {}".format( | |
dag_run.dag_id, dag_run.execution_date, dag_run.get_state())) | |
if kwargs["params"]["refresh_failed"] is not None and kwargs["params"]["refresh_failed"].lower() == "true": | |
# check for state change in FAILED runs | |
for dag_id in dag_bag.dag_ids: | |
dag = dag_bag.get_dag(dag_id) | |
if dag is None: | |
raise Exception("invalid dag id") | |
logging.info("check for dag run in FAILED state: {}".format(dag.dag_id)) | |
dag_runs = DagRun.find(dag_id=dag_id, state=State.FAILED, execution_start_date=start_range) | |
for dag_run in dag_runs: | |
logging.info("dag run {} state on {} is in {} state".format(dag_run.dag_id, dag_run.execution_date, dag_run.get_state())) | |
if dag_run.get_state() == State.FAILED: | |
dag_run.dag = dag | |
dag_run.update_state() | |
if dag_run.get_state() == State.SUCCESS: | |
status_updated += 1 | |
logging.info("dag run {} state on {} was in FAILED state, after update it is in: {}".format( | |
dag_run.dag_id, dag_run.execution_date, dag_run.get_state())) | |
logging.info("steroids doped, updated with success: {} dag runs".format(status_updated)) | |
return | |
inject_steroids = PythonOperator( | |
task_id="inject_steroids", | |
dag=dag, | |
python_callable=inject | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment