Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save dumkydewilde/59ae86e933fe5825730b6918264eb07d to your computer and use it in GitHub Desktop.
Save dumkydewilde/59ae86e933fe5825730b6918264eb07d to your computer and use it in GitHub Desktop.
Custom dbt materialization for Snowflake Streamlit Apps
{% materialization streamlit, adapter='snowflake', supported_languages = ['python'] %}
{% set original_query_tag = set_query_tag() %}
{% set identifier = model['alias'] %}
{% set app_name = config.get('app_name', identifier) %}
{% set stage_name = config.get('stage', 'streamlit_apps') %}
{% set create_stage = config.get('create_stage', true) %}
{% set stage_id = [database, schema, stage_name] | join('.') %}
{% set include_files = config.get('include_files', []) %}
{% if config.get('include_manifest', true) %}
{{ include_files.append('target/manifest.json') }}
{% endif %}
{%- set existing_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(
identifier=identifier,
schema=schema,
database=database,
type='table'
) -%}
{% set main_file = config.get('main_file', model.path) %}
{{ run_hooks(pre_hooks) }}
{% if existing_relation is not none and flags.FULL_REFRESH %}
{% call statement('drop_statement') %}
{{ log("Dropping app and stage", info=True)}}
DROP STREAMLIT IF EXISTS {{ target_relation }};
DROP STAGE IF EXISTS {{ stage_id }};
{% endcall %}
{% endif %}
{% if create_stage %}
{% call statement('create_stage') %}
{{ log('Creating stage: ' ~ stage_id, info=True) }}
CREATE OR ALTER STAGE {{ stage_id }}
DIRECTORY = (ENABLE = TRUE)
COMMENT = 'Stage for Streamlit application: {{ app_name }}';
{% endcall %}
{% endif %}
{% call statement('upload_files') %}
USE {{ database }}.{{ schema }};
PUT 'file://{{ model.original_file_path }}' @{{ stage_id }}/{{ app_name }}
OVERWRITE = TRUE
AUTO_COMPRESS = FALSE;
{% for file in config.get('include_files', []) %}
{% set dest = ([stage_name, app_name] + file.split('/')[:-1]) | join('/') %}
{% if 'environment.yml' in file %}
{% set dest = [stage_name, app_name, 'environment.yml'] | join('/') %}
{% endif %}
{{ log('Uploading file: ' ~ file ~ ' to: ' ~ database ~ '.' ~ schema ~'.' ~ dest, info=True) }}
PUT 'file://{{ file }}' @{{ dest }}
OVERWRITE = TRUE
AUTO_COMPRESS = FALSE;
{% endfor %}
{% for file in config.get('pages', []) %}
{% set dest = [stage_name, app_name, 'pages'] | join('/') %}
{{ log('Uploading page: ' ~ file ~ ' to: ' ~ database ~ '.' ~ schema ~'.' ~ dest, info=True) }}
PUT 'file://{{ file }}' @{{ dest }}
OVERWRITE = TRUE
AUTO_COMPRESS = FALSE;
{% endfor %}
{% endcall %}
{% call statement('main') %}
CREATE OR REPLACE STREAMLIT {{ target_relation }}
FROM @{{ stage_id }}/{{ app_name }}
MAIN_FILE = '{{ model.original_file_path.split("/") | last | replace("/", "") }}'
QUERY_WAREHOUSE = '{{ config.get("warehouse", target.warehouse) }}';
{% endcall %}
{{ run_hooks(post_hooks) }}
{% do unset_query_tag(original_query_tag) %}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
models:
- name: source_freshness_dashboard
config:
materialized: streamlit
include_files:
- app_data/utils.py
pages:
- app_data/pages/other_documentation_page.py
stage: my_streamlit_apps
import streamlit as st
from app_data.utils import get_relation_name_from_manifest, get_or_create_session
session = get_or_create_session()
st.set_page_config(
page_title="Customer Example Page",
page_icon="🔭",
layout="wide"
)
customer_count = session.sql(f"select count(*) as ct from {get_relation_name_from_manifest('dim_customer')}").collect()
st.metric(label="Total Customers", value=customer_count[0]['CT'])
def model(dbt, session):
return pd.Dataframe()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment