Skip to content

Instantly share code, notes, and snippets.

@kushsharma
Created July 14, 2021 05:54
Show Gist options
  • Save kushsharma/a6a4a7c66acf4b54e84c6a45008bb1f6 to your computer and use it in GitHub Desktop.
Save kushsharma/a6a4a7c66acf4b54e84c6a45008bb1f6 to your computer and use it in GitHub Desktop.
Airflow DAG for automatic scheduler steroids injection.
'''
DAG for automatic scheduler steroids injection.
This fixes Airflow scheduler leaving dag runs in running state although
tasks are completed successfully.
This runs every day and refresh last 7 days of dags but if you want to run
on ad-hoc basis refresh older dags, use manual trigger and pass start_range
as a date like '2021-01-01T00:00:00Z'
'''
from typing import Any, Callable, Dict, Optional
from datetime import datetime, timedelta, timezone
import logging
from airflow.models import DAG, Variable, DagRun, DagModel, TaskInstance, DagBag
from airflow.configuration import conf
from airflow.utils.state import State
from airflow.utils.weight_rule import WeightRule
from airflow.operators.python import PythonOperator
from airflow.settings import TIMEZONE
DAG_RETRY_DELAY = int(Variable.get("dag_retry_delay_in_secs", default_var=5 * 60))
default_empty_date = "0000-00-00T00:00:00Z"
default_args = {
"params": {
"start_range": default_empty_date,
"refresh_failed": "false"
},
"owner": "optimus",
"depends_on_past": False,
"retries": 0,
"retry_delay": timedelta(seconds=DAG_RETRY_DELAY),
"retry_exponential_backoff": False,
"priority_weight": 10000,
"start_date": datetime.strptime("2021-07-11T00:00:00", "%Y-%m-%dT%H:%M:%S"),
}
dag = DAG(
dag_id="__dag_status_refresh",
default_args=default_args,
schedule_interval="0 12 * * *",
max_active_runs=1,
)
def get_dagbag():
dagbag = DagBag(read_dags_from_db=True)
logging.info("loading dags from db")
dagbag.collect_dags_from_db()
return dagbag
def inject(**kwargs):
start_range = datetime.now() - timedelta(days=7) # default 7 day range
if kwargs["params"]["start_range"] is not None and kwargs["params"]["start_range"] != "" and kwargs["params"]["start_range"] != default_empty_date:
start_range = datetime.strptime(kwargs["params"]["start_range"], "%Y-%m-%dT%H:%M:%SZ")
start_range = start_range.replace(tzinfo=TIMEZONE)
logging.info("injecting success steroids from: {}".format(start_range))
status_updated = 0
dag_bag = get_dagbag()
# check for state change in running runs
for dag_id in dag_bag.dag_ids:
dag = dag_bag.get_dag(dag_id)
if dag is None:
raise Exception("invalid dag id")
logging.info("check for dag run in running state: {}".format(dag.dag_id))
dag_runs = DagRun.find(dag_id=dag_id, state=State.RUNNING, execution_start_date=start_range)
for dag_run in dag_runs:
logging.info("dag run {} state on {} is in {} state".format(dag_run.dag_id, dag_run.execution_date, dag_run.get_state()))
if dag_run.get_state() == State.RUNNING:
dag_run.dag = dag
dag_run.update_state()
if dag_run.get_state() == State.SUCCESS:
status_updated += 1
logging.info("dag run {} state on {} was in running state, after update it is in: {}".format(
dag_run.dag_id, dag_run.execution_date, dag_run.get_state()))
if kwargs["params"]["refresh_failed"] is not None and kwargs["params"]["refresh_failed"].lower() == "true":
# check for state change in FAILED runs
for dag_id in dag_bag.dag_ids:
dag = dag_bag.get_dag(dag_id)
if dag is None:
raise Exception("invalid dag id")
logging.info("check for dag run in FAILED state: {}".format(dag.dag_id))
dag_runs = DagRun.find(dag_id=dag_id, state=State.FAILED, execution_start_date=start_range)
for dag_run in dag_runs:
logging.info("dag run {} state on {} is in {} state".format(dag_run.dag_id, dag_run.execution_date, dag_run.get_state()))
if dag_run.get_state() == State.FAILED:
dag_run.dag = dag
dag_run.update_state()
if dag_run.get_state() == State.SUCCESS:
status_updated += 1
logging.info("dag run {} state on {} was in FAILED state, after update it is in: {}".format(
dag_run.dag_id, dag_run.execution_date, dag_run.get_state()))
logging.info("steroids doped, updated with success: {} dag runs".format(status_updated))
return
inject_steroids = PythonOperator(
task_id="inject_steroids",
dag=dag,
python_callable=inject
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment