Created
August 5, 2018 16:49
-
-
Save cosenal/cbd38b13450b652291e655138baa1aba to your computer and use it in GitHub Desktop.
stackoverflow.com/questions/51664755/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, date, timedelta | |
from airflow import DAG | |
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.operators.subdag_operator import SubDagOperator | |
# Dag is returned by a factory method | |
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval): | |
dag = DAG( | |
'%s.%s' % (parent_dag_name, child_dag_name), | |
schedule_interval=schedule_interval, | |
start_date=start_date, | |
) | |
dummy2 = DummyOperator( | |
task_id='dummy2', | |
dag=dag, | |
) | |
dummy3 = DummyOperator( | |
task_id='dummy3', | |
dag=dag, | |
trigger_rule='one_success', | |
) | |
def is_saturday(): | |
if date.today().weekday() == 6: | |
return 'dummy2' | |
else: | |
return 'today_is_not_saturday' | |
branch_on_saturday = BranchPythonOperator( | |
task_id='branch_on_saturday', | |
python_callable=is_saturday, | |
dag=dag, | |
) | |
not_saturday = DummyOperator( | |
task_id='today_is_not_saturday', | |
dag=dag | |
) | |
branch_on_saturday >> dummy2 >> dummy3 | |
branch_on_saturday >> not_saturday >> dummy3 | |
return dag | |
dag = DAG( | |
'test_branches', | |
description='Test branches', | |
catchup=False, | |
schedule_interval='0 0 * * *', | |
start_date=datetime(2018, 8, 1), | |
) | |
def python1(): | |
raise Exception('Test failure') | |
# print('Test success') | |
dummy1 = PythonOperator( | |
task_id='python1', | |
python_callable=python1, | |
dag=dag | |
) | |
sub_dag = SubDagOperator( | |
subdag=sub_dag( | |
'test_branches', | |
'child_dag', | |
dag.start_date, | |
dag.schedule_interval | |
), | |
task_id='child_dag', | |
dag=dag, | |
) | |
dummy1 >> sub_dag |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment