- Install Docker Community Edition (CE) on your workstation. Depending on the OS, you may need to configure your Docker instance to use 4.00 GB of memory for all containers to run properly. Please refer to the Resources section if using Docker for Windows or Docker for Mac for more information.
- Install Docker Compose v1.29.1 and newer on your workstation.
- Run
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.2/docker-compose.yaml'
- Setting the right Airflow user
mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
- Initialize the database: On all operating systems, you need to run database migrations and create the first user account. The account created has the login airflow and the password airflow.
docker-compose up airflow-init
- Running Airflow: start all services
docker-compose up
https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html
The docker-compose we prepare is a “Quick-start” one. It is not intended to be used in production and it has a number of caveats - one of them being that the best way to recover from any problem is to clean it up and restart from the scratch. The best way to do it is to:
- Run
docker-compose down --volumes --remove-orphans
command in the directory you downloaded thedocker-compose.yaml
file - remove the whole directory where you downloaded the
docker-compose.yaml file rm -rf '<DIRECTORY>'
- re-download the
docker-compose.yaml
file - re-start following the instructions from #5
- DAG: It is the Directed Acyclic Graph – a collection of all the tasks that you want to run which is organized and shows the relationship between different tasks. It is defined in a python script.
- Web Server: It is the user interface built on the Flask. It allows us to monitor the status of the DAGs and trigger them.
- Metadata Database: Airflow stores the status of all the tasks in a database and do all read/write operations of a workflow from here.
- Scheduler: As the name suggests, this component is responsible for scheduling the execution of DAGs. It retrieves and updates the status of the task in the database.
Overview of all DAGs. Successful and failed DAGs, Last execution time and other useful links.
Visualization of a DAG's dependencies and their current status for a specific run.
Tree representation of a DAG that spans across time.
Total time spent on different tasks over time.
The code that was used to generate the DAG.
Duration and overlap of a DAG.
Quick way to view source code of a DAG.
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
- owner: The name of the owner of the workflow, should be alphanumeric and can have underscores but should not contain any spaces.
- depends_on_past: If each time you run your workflow, the data depends upon the past run then mark it as True otherwise mark it as False.
- start_date: Start date of your workflow
- email: Your email ID, so that you can receive an email whenever any task fails due to any reason.
- retry_delay: If any task fails, then how much time it should wait to retry it.
# These args will get passed on to each operator. You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'pradeep',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
Create a DAG object and pass the dag_id which is the name of the DAG and it should be unique. Pass the arguments that we defined in the last step and add a description and schedule_interval which will run the DAG after the specified interval of time.
with DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
sensor = SFTPSensor(
task_id="sensor",
path="/root/test",
execution_timeout=timedelta(seconds=60),
timeout=3600,
retries=2,
mode="reschedule",
)
task2 = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag,
)
sensor >> task2
- BashOperator – used to execute bash commands on the machine it runs on
- PythonOperator – takes any python function as an input and calls the same (this means the function should have a specific signature as well)
- EmailOperator – sends emails using SMTP server configured
- SimpleHttpOperator – makes an HTTP request that can be used to trigger actions on a remote system.
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. – used to run SQL commands
- Qubole Operator: allows users to run and get results from Presto, Hive, Hadoop, Spark Commands, Zeppelin Notebooks, Jupyter Notebooks, Data Import / Export Jobs on the configured Qubole account.
Executors are the mechanism by which task instances get run. It can be built-in or custom
- Debug Executor
- Local Executor -- Runs on a single system -- Treats tasks as processes -- Parallelism defined by the user -- Can utilise all resources of a given host system
- Sequential Executors -- The default Airflow executor -- Runs one task at a time -- Useful for debugging -- Not recommended for production
- Celery Executor -- Uses a Celery backend as task manager -- Multiple worker systems can be defined -- Is significantly more difficult to setup and configure -- Extemely powerful method for organisations with extensive workflows
- Kubernetes Executor
- Dask Executor
- Kubernetes Executor
Built-in: executor = KubernetesExecutor
Custom: executor = my_company.executors.MyCustomExecutor
- ExternalTaskSensor: waits on another task (in a different DAG) to complete execution.
- HivePartitionSensor: waits for a specific value of partition of hive table to get created
- S3KeySensor: S3 Key sensors are used to wait for a specific file or directory to be available on an S3 bucket.
- Hooks are interfaces to services external to the Airflow Cluster.
- While Operators provide a way to create tasks that may or may not communicate with some external service, hooks provide a uniform interface to access external services like S3, MySQL, Hive, Qubole, etc.
When you create an instance of an operator in a DAG and provide it with it's required parameters, it becomes a task. When Airflow executes that task for a given execution_date, it becomes a task instance. The first argument task_id acts as a unique identifier for the task.
# t1, t2 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
The precedence rules for a task are as follows:
- Explicitly passed arguments
- Values that exist in the
default_args
dictionary - The operator's default value, if one exists
A task must include or inherit the arguments task_id
and owner
, otherwise Airflow will raise an exception.
Let’s assume we are saving the code from the previous step in tutorial.py
in the DAGs folder referenced in your airflow.cfg
. The default location for your DAGs is ~/airflow/dags
.
python ~/airflow/dags/tutorial.py
Define your workflows or DAGs.
Contains logs from task execution and scheduler.
Custom plugins.
All the configurstions for Dcoker goes here.
This file contains several service definitions:
airflow-scheduler
- The scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.airflow-webserver
- The webserver is available at http://localhost:8080.airflow-worker
- The worker that executes the tasks given by the scheduler.airflow-init
- The initialization service.flower
- The flower app for monitoring the environment. It is available at http://localhost:5555.postgres
- The database.redis
- The redis - broker that forwards messages from scheduler to worker.
├───dags
│ │
│ ├───project_1
│ │ dag_1.py
│ │ dag_2.py
│ │
│ └───project_2
│ dag_1.py
│ dag_2.py
│
├───plugins
│ ├───hooks
│ │ pysftp_hook.py
| | servicenow_hook.py
│ │
│ ├───sensors
│ │ ftp_sensor.py
| | sql_sensor.py
| |
│ ├───operators
│ │ servicenow_to_azure_blob_operator.py
│ │ postgres_templated_operator.py
│ |
│ ├───scripts
│ ├───project_1
| | transform_cases.py
| | common.py
│ ├───project_2
| | transform_surveys.py
| | common.py
│ ├───common
| helper.py
| dataset_writer.py
| .airflowignore
| Dockerfile
| docker-stack-airflow.yml
XComs are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines.
- An XCom is identified by a
key
(essentially its name), as well as thetask_id
anddag_id
it came from. - They can have any (serializable) value, but they are only designed for small amounts of data; do not use them to pass around large values, like dataframes.
- XComs are explicitly “pushed” using the
xcom_push()
and “pulled” usingxcom_pull()
to/from their storage on Task Instances. - Many operators will auto-push their results into an XCom key called
return_value
if thedo_xcom_push
argument is set toTrue
(as it is by default), and@task
functions do this as well.
XComs are a relative of Variables, with the main difference being that XComs are per-task-instance and designed for communication within a DAG run, while Variables are global and designed for overall configuration and value sharing.