Last active
May 17, 2022 15:35
-
-
Save nickefy/e3020499ed9cc718e6c6aaad8e366f94 to your computer and use it in GitHub Desktop.
example dag for sensors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.exceptions import AirflowException | |
from airflow import models | |
from airflow import DAG | |
from datetime import datetime, timedelta | |
from airflow.utils.email import send_email | |
from dateutil.relativedelta import relativedelta | |
import os | |
schedule_interval_dag = timedelta(days=1) | |
default_dag_args = { | |
# Setting start date as yesterday starts the DAG immediately when it is | |
# detected in the Cloud Storage bucket. | |
# set your start_date : airflow will run previous dags if dags since startdate has not run | |
'start_date': '2019-08-30', | |
'email_on_failure': True, | |
'email_on_retry': True, | |
'project_id' : 'your project id', | |
'retries': 1, | |
'on_failure_callback': notify_email, | |
'retry_delay': timedelta(minutes=5), | |
} | |
with models.DAG( | |
dag_id='dag name', | |
# Continue to run DAG once per day | |
schedule_interval = schedule_interval_dag, | |
catchup = True, | |
default_args=default_dag_args) as dag: | |
check_data_source_1 = | |
ExternalSensor.ExternalTaskSensor( | |
task_id='check_data_source_1', | |
external_dag_id='dag of data source 1', | |
external_task_id= 'last task of the dag', | |
execution_delta = timedelta(hours=1), | |
timeout = 300) | |
check_data_source_2 = | |
ExternalSensor.ExternalTaskSensor( | |
task_id='check_data_source_2', | |
external_dag_id='dag of data source 2', | |
external_task_id= 'last task of the dag', | |
execution_delta = timedelta(hours=1), | |
timeout = 300) | |
check_external_data_source_1 = | |
ExternalSensor.ExternalTaskSensor( | |
task_id='check_external_data_source_1', | |
external_dag_id='dag of external data source 1', | |
external_task_id= 'last task of the dag', | |
execution_delta = timedelta(hours=1), | |
timeout = 300) | |
check_external_data_source_2 = | |
ExternalSensor.ExternalTaskSensor( | |
task_id='check_external_data_source_2', | |
external_dag_id='dag of external data source 2', | |
external_task_id= 'last task of the dag', | |
execution_delta = timedelta(hours=1), | |
timeout = 300) | |
transform_table_1 = | |
# code for transfromation of table 1 | |
check_data_source_1 | |
check_data_source_2 | |
check_external_data_source_1 | |
check_external_data_source_2 | |
transform_table_1.set_upstream([check_data_source_1,check_data_source_2,check_external_data_source_1,check_external_data_source_2]) |
Yes, it means that you are sensing for the wrong task/wrong execution_time
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Do you encounter this error:
{external_task_sensor.py} INFO - Poking for [external_dag_id][external_task_id] on [time]...
Always...(Except for not setting timeout parameter)