Skip to content

Instantly share code, notes, and snippets.

@nickefy
Last active May 17, 2022 15:35
Show Gist options
  • Save nickefy/e3020499ed9cc718e6c6aaad8e366f94 to your computer and use it in GitHub Desktop.
Save nickefy/e3020499ed9cc718e6c6aaad8e366f94 to your computer and use it in GitHub Desktop.
example dag for sensors
from airflow.exceptions import AirflowException
from airflow import models
from airflow import DAG
from datetime import datetime, timedelta
from airflow.utils.email import send_email
from dateutil.relativedelta import relativedelta
import os
schedule_interval_dag = timedelta(days=1)
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
# set your start_date : airflow will run previous dags if dags since startdate has not run
'start_date': '2019-08-30',
'email_on_failure': True,
'email_on_retry': True,
'project_id' : 'your project id',
'retries': 1,
'on_failure_callback': notify_email,
'retry_delay': timedelta(minutes=5),
}
with models.DAG(
dag_id='dag name',
# Continue to run DAG once per day
schedule_interval = schedule_interval_dag,
catchup = True,
default_args=default_dag_args) as dag:
check_data_source_1 =
ExternalSensor.ExternalTaskSensor(
task_id='check_data_source_1',
external_dag_id='dag of data source 1',
external_task_id= 'last task of the dag',
execution_delta = timedelta(hours=1),
timeout = 300)
check_data_source_2 =
ExternalSensor.ExternalTaskSensor(
task_id='check_data_source_2',
external_dag_id='dag of data source 2',
external_task_id= 'last task of the dag',
execution_delta = timedelta(hours=1),
timeout = 300)
check_external_data_source_1 =
ExternalSensor.ExternalTaskSensor(
task_id='check_external_data_source_1',
external_dag_id='dag of external data source 1',
external_task_id= 'last task of the dag',
execution_delta = timedelta(hours=1),
timeout = 300)
check_external_data_source_2 =
ExternalSensor.ExternalTaskSensor(
task_id='check_external_data_source_2',
external_dag_id='dag of external data source 2',
external_task_id= 'last task of the dag',
execution_delta = timedelta(hours=1),
timeout = 300)
transform_table_1 =
# code for transfromation of table 1
check_data_source_1
check_data_source_2
check_external_data_source_1
check_external_data_source_2
transform_table_1.set_upstream([check_data_source_1,check_data_source_2,check_external_data_source_1,check_external_data_source_2])
@tyytong
Copy link

tyytong commented Jan 15, 2020

Do you encounter this error:
{external_task_sensor.py} INFO - Poking for [external_dag_id][external_task_id] on [time]...
Always...(Except for not setting timeout parameter)

@nickefy
Copy link
Author

nickefy commented Jan 17, 2020

Yes, it means that you are sensing for the wrong task/wrong execution_time

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment