Created
July 20, 2021 13:55
-
-
Save 4sushi/d2e36598e94f1dc0e4f812a1a5130bb8 to your computer and use it in GitHub Desktop.
Airflow use multiple schedule interval for one DAG. Condition based on date execution. Ex: run daily + monthly
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import timedelta | |
from airflow import DAG | |
from airflow.utils.dates import days_ago | |
from airflow.operators.python_operator import BranchPythonOperator | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.utils.trigger_rule import TriggerRule | |
from croniter import croniter | |
import dateutil.parser | |
default_args = { | |
"start_date": days_ago(1), | |
"owner": "XXX", | |
"email": ["XXX"], | |
"email_on_failure": True, | |
"email_on_retry": False, | |
"retries": 1, | |
"retry_delay": timedelta(minutes=5), | |
} | |
dag = DAG("test", default_args=default_args, schedule_interval="40 1 * * *") | |
def operator_monthly(**context): | |
schedul_interval = "* * 1 * *" | |
date_execution_iso = context['ts'] # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html# | |
dt = dateutil.parser.isoparse(date_execution_iso) | |
if croniter.match(schedul_interval, dt): | |
return run_monthly.task_id | |
else: | |
return run_daily.task_id | |
check_run_monthly = BranchPythonOperator( | |
task_id='check_run_monthly', | |
python_callable=operator_monthly, | |
provide_context=True, | |
dag=dag, | |
retries=0, | |
) | |
run_monthly = DummyOperator(task_id='run_monthly', dag=dag) | |
run_daily = DummyOperator(task_id='run_daily', dag=dag, trigger_rule=TriggerRule.NONE_FAILED) | |
check_run_monthly >> run_monthly >> run_daily |
Author
4sushi
commented
Jul 20, 2021
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment