Skip to content

Instantly share code, notes, and snippets.

@vvgsrk
Last active March 6, 2025 19:28
Show Gist options
  • Save vvgsrk/8195023515f619b2af2c758ac965c52d to your computer and use it in GitHub Desktop.
Save vvgsrk/8195023515f619b2af2c758ac965c52d to your computer and use it in GitHub Desktop.
An example code to invoke dbt cloud sensor class
from airflow import DAG
from config.current.agm import PARENT_DAG_NAME
from config.current.data_source import job_id_in_dbt_cloud
with DAG(
dag_id=PARENT_DAG_NAME,
...
) as dag:
raw_loading ...
# Instantiate the sensor class
dbt_cloud_sensor = DBTCloudSensor()
if ENV in ['preprod', 'prod'] and job_id_in_dbt_cloud:
# Only create the DbtCloudRunJobOperator task in preprod and prod environments
trigger_dbt_cloud_job_run = dbt_cloud_sensor.dbt_cloud_run_job_operator(task_id='trigger_dbt_cloud_job_run', job_id=job_id_in_dbt_cloud, dag=dag)
else:
# In dev job_id_in_dbt_cloud is not set, so use a DummyOperator
trigger_dbt_cloud_job_run = DummyOperator(task_id='trigger_dbt_cloud_job_run', dag=dag)
all_done = DummyOperator(task_id='all_done', trigger_rule='all_success', dag=dag)
raw_loading >> trigger_dbt_cloud_job_run >> all_done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment