Skip to content

Instantly share code, notes, and snippets.

@misho-kr
Last active March 15, 2021 20:28
Show Gist options
  • Save misho-kr/68f7b92cc5d387a27b75647336a21d4e to your computer and use it in GitHub Desktop.
Save misho-kr/68f7b92cc5d387a27b75647336a21d4e to your computer and use it in GitHub Desktop.
Summary of "Introduction to Airflow in Python" from Datacamp.Org (https://gist.github.com/misho-kr/873ddcc2fc89f1c96414de9e0a58e0fe)

A guide to the basic concepts of Airflow and how to implement data engineering workflows in production

By Mike Metzger, Data Engineer Consultant @ Flexible Creations

Intro to Airflow

An introduction to the components of Apache Airflow and why to use them

  • Airow is a platform to program Data Engineering workflow
    • Creation, scheduling, monitoring
    • Implement programs from any language, but workows are written in Python
  • Implements workows as Directed Acyclic Graphs (DAGs)
    • Represents the set of tasks that make up your workow
    • Made up of components (typically tasks) to be executed, such as operators, sensors, etc.
    • Contain dependencies dened explicitly or implicitly
  • Accessed via code, command-line, or via web
from airflow.models import DAG
from datetime import datetime

etl_dag = DAG(
  dag_id='etl_pipeline',
  default_args={"start_date": "2020-01-08"}
)

default_arguments = {
  'owner': 'jdoe',
  'email': '[email protected]',
  'start_date': datetime(2020, 1, 20)
}

etl_dag = DAG( 'etl_workflow', default_args=default_arguments )
> airflow run example-etl download-file 2020-01-10

Implementing Airflow DAGs

Basics of implementing Airflow DAGs using operators, tasks, and scheduling

  • Operators represent a single task in a workflow that run independently (usually)
    • May not run in the same location / environment
    • Difficult to run tasks with elevated privileges
  • Task dependencies define a given order of task completion
    • upstream task means executed before
    • downstream task means executed after
  • BashOperator executes a given Bash command or script
  • PythonOperator executes a Python function / callable
    • Supports arguments to tasks
    • Use the op_kwargs dictionary
  • EmailOperator sends an email
    • Requires the Airflow system to be configured with email server details
  • DAG Run is a specific instance of a workflow at a point in time
    • Can be run manually or via schedule_interval
    • Schedule attributes -- start_date, end_date, max_tries, schedule_interval
      • schedule_interval can be defined via cron style syntax or via built-in presets
      • @hourley, @daily, ..., @once, None
    • Maintain state for each workflow and the tasks within
      • running, failed, success
    • Airflow will schedule the task at start_date + schedule_interval
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator

example_task = BashOperator(task_id='bash_ex',
                            bash_command='echo 1',
                            dag=dag)
bash_task = BashOperator(task_id='clean_addresses',
                         bash_command='cat addresses.txt | awk "NF==10" > cleaned.txt',
                         dag=dag)

def sleep(length_of_time):
  time.sleep(length_of_time)
  
sleep_task = PythonOperator(
  task_id='sleep',
  python_callable=sleep,
  op_kwargs={'length_of_time': 5},
  dag=example_dag
)

email_task = EmailOperator(
  task_id='email_sales_report',
  to='[email protected]',
  subject='Automated Sales Report',
  html_content='Attached is the latest sales report',
  files='latest_sales.xlsx',
  dag=example_dag
)

example_task >> bash_task << sleep_task
bash_task >> email_task

Maintaining and monitoring Airflow workflows

Airflow components such as sensors and executors while monitoring and troubleshooting Airflow workflows

  • Sensor is an operator that waits for a certain condition to be true
    • mode=poke -- the default, run repeatedly
    • mode=reschedule -- give up task slot and try again later
    • poke_interval -- how often to wait between checks
    • timeout -- how long to wait before failing task
  • File sensor, HttpSensor, SqlSensor, ExternalTaskSensor
    • Many others in airflow.sensors and airflow.contrib.sensors
  • Airfow executors run tasks
    • Different executors handle running the tasks differently
    • SequentialExecutor -- default executor, runs one task at a time
    • LocalExecutor -- runs on single system, Tteats tasks as processes
    • CeleryExecutor -- multiple worker systems, uses a Celery backend as task manager
  • Debugging and troubleshooting in Airflow
    • DAG won't run on schedule
    • DAG won't load
    • Syntax errors
  • SLA and reporting in Airflow
    • The amount of time a task or a DAG should require to run
    • SLA Miss is any time the task / DAG does not meet the expected timing
  • General reporting
from airflow.contrib.sensors.file_sensor import FileSensor

file_sensor_task = FileSensor(task_id='file_sense',
                              filepath='salesdata.csv',
                              poke_interval=300,
                              dag=sales_report_dag)
init_sales_cleanup >> file_sensor_task >> generate_report

default_args={
  'sla': timedelta(minutes=20)
  'start_date': datetime(2020,2,20)
}
dag = DAG('sla_dag', default_args=default_args)

task1 = BashOperator(task_id='sla_task',
  bash_command='runcode.sh',
  sla=timedelta(seconds=30),
  dag=dag)

default_args={
  'email': ['[email protected]'],
  'email_on_failure': True,
  'email_on_retry': False,
  'email_on_success': True,
  ...
}

Building production pipelines in Airflow

Build a production quality workflow in Airflow

  • Working with templates
    • Created using the Jinja templating language
    • Allow substituting information during a DAG run
  • Variables
    • Airflow built-in runtime variables
    • Provide assorted information about DAG runs, tasks, and even the system con guration
  • Macros provide various useful objects / methods for Airflow templates
  • Branching provides conditional logic
    • Takes a python_callable to return the next task id (or list of ids) to follow
  • Running DAGs and Tasks
> airflow trigger_dag -e <date> <dag_id>
> airflow run <dag_id> <task_id> <date>
templated_command="""
echo "Reading {{ params.filename }}"
"""
t1 = BashOperator(task_id='template_task',
                  bash_command=templated_command,
                  params={'filename': 'file1.txt'},
                  dag=example_dag)

templated_command="""
{% for filename in params.filenames %}
echo "Reading {{ filename }}"
{% endfor %}
"""
t1 = BashOperator(task_id='template_task',
                  bash_command=templated_command,
                  params={'filenames': ['file1.txt', 'file2.txt']},
                  dag=example_dag)

text_with_vars="""
Execution Date: {{ ds }} # YYYY-MM-DD
Execution Date, no dashes: {{ ds_nodash }} # YYYYMMDD
Previous Execution date: {{ prev_ds }} # YYYY-MM-DD
Prev Execution date, no dashes: {{ prev_ds_nodash }} # YYYYMMDD
DAG object: {{ dag }}
"""

text_with_macros="""
{{ macros.datetime }} : The datetime.datetime object
{{ macros.timedelta }} : The timedelta object
{{ macros.uuid }} : Python's uuid object
{{ macros.ds_add('2020-04-15', 5) }}

def branch_test(**kwargs):
  if int(kwargs['ds_nodash']) % 2 == 0:
    return 'even_day_task'
  else:
  return 'odd_day_task'
  
branch_task = BranchPythonOperator(task_id='branch_task',
                                   dag=dag,
                                   provide_context=True,
                                   python_callable=branch_test)
                                   
start_task >> branch_task >> even_day_task >> even_day_task2
branch_task >> odd_day_task >> odd_day_task2
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment