Last active
December 26, 2019 14:50
-
-
Save robert8138/51d42b6b07a92fcd6bc38d138936bce1 to your computer and use it in GitHub Desktop.
A toy example of a DAG definition file in Airflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A DAG docstring might be a good way to explain at a high level | |
what problem space the DAG is looking at. | |
Links to design documents, upstream dependencies etc | |
are highly recommended. | |
""" | |
from datetime import datetime, timedelta | |
from airflow.models import DAG # Import the DAG class | |
from airflow.operators.sensors import NamedHivePartitionSensor | |
from airflow.operators.hive_operator import HiveOperator | |
### You can import more operators as you see fit! | |
# from airflow.operators.bash_operator import BashOperator | |
# from airflow.operators.python_operator import PythonOperator | |
# setting some default arguments for the DAG | |
default_args = { | |
'owner': 'you', | |
'depends_on_past': False, | |
'start_date': datetime(2018, 2, 9), | |
} | |
# Instantiate the Airflow DAG | |
dag = DAG( | |
dag_id='anatomy_of_a_dag', | |
description="This describes my DAG", | |
default_args=default_args, | |
schedule_interval=timedelta(days=1)) # This is a daily DAG. | |
# Put upstream dependencies in a dictionary | |
wf_dependencies = { | |
'wf_upstream_table_1': 'upstream_table_1/ds={{ ds }}', | |
'wf_upstream_table_2': 'upstream_table_2/ds={{ ds }}', | |
'wf_upstream_table_3': 'upstream_table_3/ds={{ ds }}', | |
} | |
# Define the sensors for upstream dependencies | |
for wf_task_id, partition_name in wf_dependencies.iteritems(): | |
NamedHivePartitionSensor( | |
task_id=wf_task_id, | |
partition_names=[partition_name], | |
dag=dag | |
) | |
# Put the tasks in a list | |
tasks = [ | |
('hql', 'task_1'), | |
('hql', 'task_2'), | |
] | |
# Define the operators in the list above | |
for directory, task_name in tasks: | |
HiveOperator( | |
task_id=task_name, | |
hql='{0}/{1}.hql'.format(directory, task_name), | |
dag=dag, | |
) | |
# Put the dependencies in a map | |
deps = { | |
'task_1': [ | |
'wf_upstream_table_1', | |
'wf_upstream_table_2', | |
], | |
'task_2': [ | |
'wf_upstream_table_1', | |
'wf_upstream_table_2', | |
'wf_upstream_table_3', | |
], | |
} | |
# Explicitly define the dependencies in the DAG | |
for downstream, upstream_list in deps.iteritems(): | |
for upstream in upstream_list: | |
dag.set_dependency(upstream, downstream) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment