A guide to the basic concepts of Airflow and how to implement data engineering workflows in production
By Mike Metzger, Data Engineer Consultant @ Flexible Creations
An introduction to the components of Apache Airflow and why to use them
- Airow is a platform to program Data Engineering workflow
- Creation, scheduling, monitoring
- Implement programs from any language, but workows are written in Python
- Implements workows as
Directed Acyclic Graphs
(DAGs)- Represents the set of tasks that make up your workow
- Made up of components (typically tasks) to be executed, such as operators, sensors, etc.
- Contain dependencies dened explicitly or implicitly
- Accessed via code, command-line, or via web
from airflow.models import DAG
from datetime import datetime
etl_dag = DAG(
dag_id='etl_pipeline',
default_args={"start_date": "2020-01-08"}
)
default_arguments = {
'owner': 'jdoe',
'email': '[email protected]',
'start_date': datetime(2020, 1, 20)
}
etl_dag = DAG( 'etl_workflow', default_args=default_arguments )
> airflow run example-etl download-file 2020-01-10
Basics of implementing Airflow DAGs using operators, tasks, and scheduling
- Operators represent a single task in a workflow that run independently (usually)
- May not run in the same location / environment
- Difficult to run tasks with elevated privileges
- Task dependencies define a given order of task completion
upstream
task means executed beforedownstream
task means executed after
BashOperator
executes a given Bash command or scriptPythonOperator
executes a Python function / callable- Supports arguments to tasks
- Use the
op_kwargs
dictionary
EmailOperator
sends an email- Requires the Airflow system to be configured with email server details
- DAG Run is a specific instance of a workflow at a point in time
- Can be run manually or via
schedule_interval
- Schedule attributes --
start_date
,end_date
,max_tries
,schedule_interval
schedule_interval
can be defined via cron style syntax or via built-in presets@hourley
,@daily
, ...,@once
,None
- Maintain state for each workflow and the tasks within
running
,failed
,success
- Airflow will schedule the task at
start_date
+schedule_interval
- Can be run manually or via
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
example_task = BashOperator(task_id='bash_ex',
bash_command='echo 1',
dag=dag)
bash_task = BashOperator(task_id='clean_addresses',
bash_command='cat addresses.txt | awk "NF==10" > cleaned.txt',
dag=dag)
def sleep(length_of_time):
time.sleep(length_of_time)
sleep_task = PythonOperator(
task_id='sleep',
python_callable=sleep,
op_kwargs={'length_of_time': 5},
dag=example_dag
)
email_task = EmailOperator(
task_id='email_sales_report',
to='[email protected]',
subject='Automated Sales Report',
html_content='Attached is the latest sales report',
files='latest_sales.xlsx',
dag=example_dag
)
example_task >> bash_task << sleep_task
bash_task >> email_task
Airflow components such as sensors and executors while monitoring and troubleshooting Airflow workflows
- Sensor is an operator that waits for a certain condition to be true
mode=poke
-- the default, run repeatedlymode=reschedule
-- give up task slot and try again laterpoke_interval
-- how often to wait between checkstimeout
-- how long to wait before failing task
File sensor
,HttpSensor
,SqlSensor
,ExternalTaskSensor
- Many others in
airflow.sensors
andairflow.contrib.sensors
- Many others in
- Airfow executors run tasks
- Different executors handle running the tasks differently
SequentialExecutor
-- default executor, runs one task at a timeLocalExecutor
-- runs on single system, Tteats tasks as processesCeleryExecutor
-- multiple worker systems, uses aCelery
backend as task manager
- Debugging and troubleshooting in Airflow
- DAG won't run on schedule
- DAG won't load
- Syntax errors
- SLA and reporting in Airflow
- The amount of time a task or a DAG should require to run
SLA Miss
is any time the task / DAG does not meet the expected timing
- General reporting
from airflow.contrib.sensors.file_sensor import FileSensor
file_sensor_task = FileSensor(task_id='file_sense',
filepath='salesdata.csv',
poke_interval=300,
dag=sales_report_dag)
init_sales_cleanup >> file_sensor_task >> generate_report
default_args={
'sla': timedelta(minutes=20)
'start_date': datetime(2020,2,20)
}
dag = DAG('sla_dag', default_args=default_args)
task1 = BashOperator(task_id='sla_task',
bash_command='runcode.sh',
sla=timedelta(seconds=30),
dag=dag)
default_args={
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'email_on_success': True,
...
}
Build a production quality workflow in Airflow
- Working with templates
- Created using the Jinja templating language
- Allow substituting information during a DAG run
- Variables
- Airflow built-in runtime variables
- Provide assorted information about DAG runs, tasks, and even the system con guration
- Macros provide various useful objects / methods for Airflow templates
- Branching provides conditional logic
- Takes a python_callable to return the next task id (or list of ids) to follow
- Running DAGs and Tasks
> airflow trigger_dag -e <date> <dag_id>
> airflow run <dag_id> <task_id> <date>
templated_command="""
echo "Reading {{ params.filename }}"
"""
t1 = BashOperator(task_id='template_task',
bash_command=templated_command,
params={'filename': 'file1.txt'},
dag=example_dag)
templated_command="""
{% for filename in params.filenames %}
echo "Reading {{ filename }}"
{% endfor %}
"""
t1 = BashOperator(task_id='template_task',
bash_command=templated_command,
params={'filenames': ['file1.txt', 'file2.txt']},
dag=example_dag)
text_with_vars="""
Execution Date: {{ ds }} # YYYY-MM-DD
Execution Date, no dashes: {{ ds_nodash }} # YYYYMMDD
Previous Execution date: {{ prev_ds }} # YYYY-MM-DD
Prev Execution date, no dashes: {{ prev_ds_nodash }} # YYYYMMDD
DAG object: {{ dag }}
"""
text_with_macros="""
{{ macros.datetime }} : The datetime.datetime object
{{ macros.timedelta }} : The timedelta object
{{ macros.uuid }} : Python's uuid object
{{ macros.ds_add('2020-04-15', 5) }}
def branch_test(**kwargs):
if int(kwargs['ds_nodash']) % 2 == 0:
return 'even_day_task'
else:
return 'odd_day_task'
branch_task = BranchPythonOperator(task_id='branch_task',
dag=dag,
provide_context=True,
python_callable=branch_test)
start_task >> branch_task >> even_day_task >> even_day_task2
branch_task >> odd_day_task >> odd_day_task2
"""