Skip to content

Instantly share code, notes, and snippets.

@cosenal
Created August 5, 2018 16:49
Show Gist options
  • Save cosenal/cbd38b13450b652291e655138baa1aba to your computer and use it in GitHub Desktop.
Save cosenal/cbd38b13450b652291e655138baa1aba to your computer and use it in GitHub Desktop.
stackoverflow.com/questions/51664755/
from datetime import datetime, date, timedelta
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
)
dummy2 = DummyOperator(
task_id='dummy2',
dag=dag,
)
dummy3 = DummyOperator(
task_id='dummy3',
dag=dag,
trigger_rule='one_success',
)
def is_saturday():
if date.today().weekday() == 6:
return 'dummy2'
else:
return 'today_is_not_saturday'
branch_on_saturday = BranchPythonOperator(
task_id='branch_on_saturday',
python_callable=is_saturday,
dag=dag,
)
not_saturday = DummyOperator(
task_id='today_is_not_saturday',
dag=dag
)
branch_on_saturday >> dummy2 >> dummy3
branch_on_saturday >> not_saturday >> dummy3
return dag
dag = DAG(
'test_branches',
description='Test branches',
catchup=False,
schedule_interval='0 0 * * *',
start_date=datetime(2018, 8, 1),
)
def python1():
raise Exception('Test failure')
# print('Test success')
dummy1 = PythonOperator(
task_id='python1',
python_callable=python1,
dag=dag
)
sub_dag = SubDagOperator(
subdag=sub_dag(
'test_branches',
'child_dag',
dag.start_date,
dag.schedule_interval
),
task_id='child_dag',
dag=dag,
)
dummy1 >> sub_dag
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment