Created
May 10, 2023 08:05
-
-
Save sungchun12/f7239abd3a5cf05fcaa152b8e94a8cec to your computer and use it in GitHub Desktop.
Simple quickstart to dbt Core DAG with bashoperator using BigQuery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is an example DAG that is a good reference for your portfolio | |
import os | |
import tempfile | |
from datetime import datetime, timedelta | |
from airflow import DAG | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.models import Connection | |
from airflow.operators.bash import BashOperator | |
def save_keyfile_to_temp_file(gcp_conn_id): | |
conn = Connection.get_connection_from_secrets(conn_id=gcp_conn_id) | |
keyfile_dict = conn.extra_dejson['keyfile_dict'] | |
# keyfile_json = json.dumps(keyfile_dict) | |
# Create a temporary file with a .json extension | |
fd, temp_file_path = tempfile.mkstemp(suffix='.json', text=True) | |
with os.fdopen(fd, 'w') as f: | |
f.write(keyfile_dict) | |
return temp_file_path | |
# TODO: replace the conn_id with your own | |
GCP_CONN_ID="your_bigquery_conn_id" | |
JSON_KEYFILE_PATH = save_keyfile_to_temp_file(GCP_CONN_ID) | |
PATH_TO_DBT_VENV = "/home/astro/dbt_venv/bin/activate" | |
# TODO replace these vars with your own | |
GIT_REPO = "dbt-bigquery-example" | |
PROJECT_ID = "dbt-demo-386220" | |
GIT_BRANCH = "main" | |
git_clone_cmds = f""" | |
git clone https://github.com/sungchun12/dbt-bigquery-example.git""" | |
dbt_setup_cmds = f""" | |
{git_clone_cmds} && | |
cd {GIT_REPO} && | |
git checkout {GIT_BRANCH} && | |
export PROJECT_ID={PROJECT_ID} && | |
export DBT_GOOGLE_BIGQUERY_KEYFILE={JSON_KEYFILE_PATH} && | |
dbt deps""" | |
dbt_run_cmd = f""" | |
{dbt_setup_cmds} && | |
dbt seed && | |
dbt run | |
""" | |
# Set default arguments for the DAG | |
default_args = { | |
"owner": "airflow", | |
"depends_on_past": False, | |
"email_on_failure": False, | |
"email_on_retry": False, | |
"retries": 1, | |
"retry_delay": timedelta(minutes=5), | |
} | |
# Define the DAG | |
dag = DAG( | |
"dbt_example_dag", | |
default_args=default_args, | |
description="An example DAG with extract, load tasks and dbt build using DockerOperator", | |
schedule_interval=timedelta(days=1), | |
catchup=False, | |
start_date=datetime(2023, 5, 9), | |
) | |
# DummyOperator for the extract task | |
extract_task = DummyOperator( | |
task_id="extract_task", | |
dag=dag, | |
) | |
# DummyOperator for the load task | |
load_task = DummyOperator( | |
task_id="load_task", | |
dag=dag, | |
) | |
dbt_run_task = BashOperator( | |
task_id="dbt_run_task", | |
bash_command=f"source $PATH_TO_DBT_VENV && {dbt_run_cmd}", | |
env={"PATH_TO_DBT_VENV": PATH_TO_DBT_VENV}, | |
) | |
# Define the task dependencies | |
extract_task >> load_task >> dbt_run_task |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM quay.io/astronomer/astro-runtime:7.4.2 | |
# Install apache-airflow-providers-google package | |
RUN pip install apache-airflow-providers-google | |
# Switch to root user for installing git | |
USER root | |
# Install git | |
RUN apt-get update && apt-get install -y git | |
# Clean up APT cache | |
RUN apt-get clean && rm -rf /var/lib/apt/lists/* | |
# Switch back to the original user | |
USER astro | |
# Create a Python virtual environment and install dbt-bigquery | |
ENV DBT_VENV_PATH /home/astro/dbt_venv | |
RUN python -m venv $DBT_VENV_PATH && \ | |
. $DBT_VENV_PATH/bin/activate && \ | |
pip install dbt-bigquery==1.5.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Evidence it works