Skip to content

Instantly share code, notes, and snippets.

@mepsrajput
Last active May 28, 2022 15:36
Show Gist options
  • Save mepsrajput/e5362689ffdd9a33242d7e08238360c4 to your computer and use it in GitHub Desktop.
Save mepsrajput/e5362689ffdd9a33242d7e08238360c4 to your computer and use it in GitHub Desktop.
Apache Airflow / Cloud Composer

Airflow setup using Airflow

  1. Install Docker Community Edition (CE) on your workstation. Depending on the OS, you may need to configure your Docker instance to use 4.00 GB of memory for all containers to run properly. Please refer to the Resources section if using Docker for Windows or Docker for Mac for more information.
  2. Install Docker Compose v1.29.1 and newer on your workstation.
  3. Run curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.2/docker-compose.yaml'
  4. Setting the right Airflow user
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
  1. Initialize the database: On all operating systems, you need to run database migrations and create the first user account. The account created has the login airflow and the password airflow. docker-compose up airflow-init
  2. Running Airflow: start all services docker-compose up

https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html

Cleaning-up the environment

The docker-compose we prepare is a “Quick-start” one. It is not intended to be used in production and it has a number of caveats - one of them being that the best way to recover from any problem is to clean it up and restart from the scratch. The best way to do it is to:

  • Run docker-compose down --volumes --remove-orphans command in the directory you downloaded the docker-compose.yaml file
  • remove the whole directory where you downloaded the docker-compose.yaml file rm -rf '<DIRECTORY>'
  • re-download the docker-compose.yaml file
  • re-start following the instructions from #5

Components of Apache Airflow

  • DAG: It is the Directed Acyclic Graph – a collection of all the tasks that you want to run which is organized and shows the relationship between different tasks. It is defined in a python script.
  • Web Server: It is the user interface built on the Flask. It allows us to monitor the status of the DAGs and trigger them.
  • Metadata Database: Airflow stores the status of all the tasks in a database and do all read/write operations of a workflow from here.
  • Scheduler: As the name suggests, this component is responsible for scheduling the execution of DAGs. It retrieves and updates the status of the task in the database.

User Interface

1. DAGS VIEW (default)

Overview of all DAGs. Successful and failed DAGs, Last execution time and other useful links.

2. GRAPH VIEW

Visualization of a DAG's dependencies and their current status for a specific run.

3. TREE VIEW

Tree representation of a DAG that spans across time.

4. TASK DURATION

Total time spent on different tasks over time.

5. TASK TRIES

The code that was used to generate the DAG.

6. LANDING TIMES

7. GANTT

Duration and overlap of a DAG.

8. DETAILS

9. CODE

Quick way to view source code of a DAG.

10. TRIGGER DAG

Steps to write a DAG

1. Importing the Libraries

from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

2. Defining the default DAG Arguments

  • owner: The name of the owner of the workflow, should be alphanumeric and can have underscores but should not contain any spaces.
  • depends_on_past: If each time you run your workflow, the data depends upon the past run then mark it as True otherwise mark it as False.
  • start_date: Start date of your workflow
  • email: Your email ID, so that you can receive an email whenever any task fails due to any reason.
  • retry_delay: If any task fails, then how much time it should wait to retry it.
# These args will get passed on to each operator. You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'pradeep',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

3. Instantiate a DAG

Create a DAG object and pass the dag_id which is the name of the DAG and it should be unique. Pass the arguments that we defined in the last step and add a description and schedule_interval which will run the DAG after the specified interval of time.

with DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

4. Layout the tasks in a workflow

sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(seconds=60),
    timeout=3600,
    retries=2,
    mode="reschedule",
)

task2 = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag,
)

5. Set Upstream/Downstream Dependency

sensor >> task2

Operators

  • BashOperator – used to execute bash commands on the machine it runs on
  • PythonOperator – takes any python function as an input and calls the same (this means the function should have a specific signature as well)
  • EmailOperator – sends emails using SMTP server configured
  • SimpleHttpOperator – makes an HTTP request that can be used to trigger actions on a remote system.
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. – used to run SQL commands
  • Qubole Operator: allows users to run and get results from Presto, Hive, Hadoop, Spark Commands, Zeppelin Notebooks, Jupyter Notebooks, Data Import / Export Jobs on the configured Qubole account.

Executors

Executors are the mechanism by which task instances get run. It can be built-in or custom

Types of Built-in Executors

1. Local Executors

  • Debug Executor
  • Local Executor -- Runs on a single system -- Treats tasks as processes -- Parallelism defined by the user -- Can utilise all resources of a given host system
  • Sequential Executors -- The default Airflow executor -- Runs one task at a time -- Useful for debugging -- Not recommended for production

2. Debug Executor

  • Celery Executor -- Uses a Celery backend as task manager -- Multiple worker systems can be defined -- Is significantly more difficult to setup and configure -- Extemely powerful method for organisations with extensive workflows
  • Kubernetes Executor
  • Dask Executor
  • Kubernetes Executor
Example: Define Executor

Built-in: executor = KubernetesExecutor Custom: executor = my_company.executors.MyCustomExecutor

Sensors

  • ExternalTaskSensor: waits on another task (in a different DAG) to complete execution.
  • HivePartitionSensor: waits for a specific value of partition of hive table to get created
  • S3KeySensor: S3 Key sensors are used to wait for a specific file or directory to be available on an S3 bucket.

Hooks

  • Hooks are interfaces to services external to the Airflow Cluster.
  • While Operators provide a way to create tasks that may or may not communicate with some external service, hooks provide a uniform interface to access external services like S3, MySQL, Hive, Qubole, etc.

Tasks

When you create an instance of an operator in a DAG and provide it with it's required parameters, it becomes a task. When Airflow executes that task for a given execution_date, it becomes a task instance. The first argument task_id acts as a unique identifier for the task.

# t1, t2 are examples of tasks created by instantiating operators

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
)

The precedence rules for a task are as follows:

  • Explicitly passed arguments
  • Values that exist in the default_args dictionary
  • The operator's default value, if one exists

A task must include or inherit the arguments task_id and owner, otherwise Airflow will raise an exception.

Running the Script

Let’s assume we are saving the code from the previous step in tutorial.py in the DAGs folder referenced in your airflow.cfg. The default location for your DAGs is ~/airflow/dags.

python ~/airflow/dags/tutorial.py

Folder Structure

dags

Define your workflows or DAGs.

logs

Contains logs from task execution and scheduler.

plugins

Custom plugins.

Dockerfile

All the configurstions for Dcoker goes here.

docker-compose.yaml

This file contains several service definitions:

  • airflow-scheduler - The scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.
  • airflow-webserver - The webserver is available at http://localhost:8080.
  • airflow-worker - The worker that executes the tasks given by the scheduler.
  • airflow-init - The initialization service.
  • flower - The flower app for monitoring the environment. It is available at http://localhost:5555.
  • postgres - The database.
  • redis - The redis - broker that forwards messages from scheduler to worker.
├───dags
│   │
│   ├───project_1
│   │     dag_1.py
│   │     dag_2.py
│   │
│   └───project_2
│         dag_1.py
│         dag_2.py
│
├───plugins
│   ├───hooks
│   │      pysftp_hook.py
|   |      servicenow_hook.py
│   │   
│   ├───sensors
│   │      ftp_sensor.py
|   |      sql_sensor.py
|   |
│   ├───operators
│   │      servicenow_to_azure_blob_operator.py
│   │      postgres_templated_operator.py
│   |
│   ├───scripts
│       ├───project_1
|       |      transform_cases.py
|       |      common.py
│       ├───project_2
|       |      transform_surveys.py
|       |      common.py
│       ├───common
|             helper.py
|             dataset_writer.py
| .airflowignore
| Dockerfile
| docker-stack-airflow.yml

XComs (cross-communications)

XComs are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines.

  • An XCom is identified by a key (essentially its name), as well as the task_id and dag_id it came from.
  • They can have any (serializable) value, but they are only designed for small amounts of data; do not use them to pass around large values, like dataframes.
  • XComs are explicitly “pushed” using the xcom_push() and “pulled” using xcom_pull() to/from their storage on Task Instances.
  • Many operators will auto-push their results into an XCom key called return_value if the do_xcom_push argument is set to True (as it is by default), and @task functions do this as well.

XComs are a relative of Variables, with the main difference being that XComs are per-task-instance and designed for communication within a DAG run, while Variables are global and designed for overall configuration and value sharing.

# This basically installs some dependencies, adds two SQL scripts and runs a provided SH script.
FROM apache/airflow:2.0.0-python3.7
USER root
# INSTALL TOOLS
RUN apt-get update \
&& apt-get -y install libaio-dev \
&& apt-get install postgresql-client
RUN mkdir extra
USER airflow
# COPY SQL SCRIPT
COPY scripts/airflow/check_init.sql ./extra/check_init.sql
COPY scripts/airflow/set_init.sql ./extra/set_init.sql
# ENTRYPOINT SCRIPT
COPY scripts/airflow/init.sh ./init.sh
ENTRYPOINT ["./init.sh"]
#!/usr/bin/env bash
# Wait for db
while ! nc $DB__HOST $DB__PORT; do
>&2 echo "Waiting for postgres to be up and running..."
sleep 1
done
export PGPASSWORD=${DB__PASSWORD}
export AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql+psycopg2://${DB__USERNAME}:${DB__PASSWORD}@${DB__HOST}:${DB__PORT}/${DB__NAME}"
# check on db if admin exists
SECURITY_ALREADY_INITIALIZED=$(cat /opt/airflow/extra/check_init.sql | psql -h ${DB__HOST} -p ${DB__PORT} -U ${DB__USERNAME} ${DB__NAME} -t | xargs | head -c 1)
# Initialize db
airflow db upgrade
if [ "${SECURITY_ALREADY_INITIALIZED}" == "0" ]; then
echo "Creating admin user.."
airflow users create -r Admin -u "$SECURITY__ADMIN_USERNAME" -e "$SECURITY__ADMIN_EMAIL" -f "$SECURITY__ADMIN_FIRSTNAME" -l "$SECURITY__ADMIN_LASTNAME" -p "$SECURITY__ADMIN_PASSWORD"
cat /opt/airflow/extra/set_init.sql | psql -h ${DB__HOST} -p ${DB__PORT} -U ${DB__USERNAME} ${DB__NAME} -t
fi
# Run scheduler
airflow scheduler &
# Run webserver
exec airflow webserver
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment