Skip to content

Instantly share code, notes, and snippets.

@vvgsrk
Created March 6, 2025 19:13
Show Gist options
  • Save vvgsrk/e0bd9a7182aa4e1241af868f8e1a13d4 to your computer and use it in GitHub Desktop.
Save vvgsrk/e0bd9a7182aa4e1241af868f8e1a13d4 to your computer and use it in GitHub Desktop.
dbt cloud sensor py file
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.hooks.base_hook import BaseHook
from config.current import DBT_CLOUD_CONNECTION_ID
class DBTCloudSensor:
def __init__(self):
self.dbt_cloud_connection = BaseHook.get_connection(DBT_CLOUD_CONNECTION_ID)
self.account_id = self.dbt_cloud_connection.extra_dejson.get('account_id')
def dbt_cloud_run_job_operator(self, task_id, job_id, dag):
"""
Creates and returns a DbtCloudRunJobOperator task.
"""
return DbtCloudRunJobOperator(
task_id=task_id,
job_id=job_id,
account_id=self.account_id,
dbt_cloud_conn_id=DBT_CLOUD_CONNECTION_ID,
check_interval=60,
wait_for_termination=True,
dag=dag,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment