Last active
November 28, 2022 18:55
-
-
Save slopp/6b2a6a6342630fe945bc1e5498d01bff to your computer and use it in GitHub Desktop.
Snowflake and Source Assets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import SourceAsset, asset, repository, AssetKey, AssetIn, with_resources, define_asset_job, AssetSelection, DailyPartitionsDefinition | |
from dagster_snowflake import build_snowflake_io_manager | |
from dagster_snowflake_pandas import SnowflakePandasTypeHandler | |
import pandas as pd | |
# Use a SourceAsset to reference an existing table | |
# snowflake usually needs a schema, so we | |
# specify it via the AssetKey as ["schema", "table"] | |
# We tell dagster that this table is partitioned by day | |
orders = SourceAsset( | |
key = AssetKey(["ANALYTICS", "orders_cleaned"]), | |
partitions_def=DailyPartitionsDefinition(start_date="2022-11-15"), | |
metadata= {"partition_expr": "ORDER_DATE"} | |
) | |
# Downstream assets should use AssetIn to refer to | |
# the SourceAsset by key | |
# The downstream asset will need to be written to a schema | |
# specified here by key_prefix | |
# The SourceAsset is loaded by default as Select * | |
# - certain columns can be selected using asset metadata | |
# - WHERE clauses are modeled via partitions | |
@asset( | |
ins = {"orders": AssetIn( | |
key = AssetKey(["ANALYTICS", "orders_cleaned"]), | |
# limit the columns pulled from the upstream asset | |
metadata={"columns": ["ORDER_DATE", "PURCHASE_PRICE", "USER_ID"]} | |
)}, | |
key_prefix = "ANALYTICS", | |
metadata={"partition_expr": "ORDER_DATE"}, | |
partitions_def=DailyPartitionsDefinition(start_date="2022-11-15") | |
) | |
def top_orders(orders: pd.DataFrame): | |
# the rows pulled from the upstream asset will match the asset partition | |
# e.g. if I materialize the top_orders asset for 2022-11-15, | |
# the orders data frame will be built using | |
# SELECT ORDER_DATE, PURCHASE_PRICE, USER_ID FROM orders WHERE ORDER_DATE = 2022-11-15 | |
top_orders = orders[:10] | |
# Result: a 10 row table RAW_DATA.top_orders is created | |
return top_orders | |
# Job asset selections should target the dagster assets not | |
# source assets | |
top_orders_job = define_asset_job( | |
name = "top_orders_job", | |
selection = AssetSelection.keys(AssetKey(["ANALYTICS", "top_orders"])) | |
) | |
snowflake = build_snowflake_io_manager([SnowflakePandasTypeHandler()]) | |
@repository | |
def my_repo(): | |
return [ | |
with_resources( | |
[orders, top_orders], | |
{ | |
# snowflake config specified through environment variables | |
# treated as secrets using {"env": "MY_SECRET_ENV_VAR"} | |
"io_manager": snowflake.configured({ | |
"database": "DEMO_DB2_BRANCH", | |
"account": {"env": "SNOWFLAKE_ACCOUNT"}, | |
"user": {"env": "SNOWFLAKE_USER"}, | |
"password": {"env": "SNOWFLAKE_PASSWORD"}, | |
"warehouse": "TINY_WAREHOUSE" | |
}) | |
} | |
) + [top_orders_job] | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The result: