Skip to content

Instantly share code, notes, and snippets.

@slopp
Last active November 28, 2022 18:55
Show Gist options
  • Save slopp/6b2a6a6342630fe945bc1e5498d01bff to your computer and use it in GitHub Desktop.
Save slopp/6b2a6a6342630fe945bc1e5498d01bff to your computer and use it in GitHub Desktop.
Snowflake and Source Assets
from dagster import SourceAsset, asset, repository, AssetKey, AssetIn, with_resources, define_asset_job, AssetSelection, DailyPartitionsDefinition
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
import pandas as pd
# Use a SourceAsset to reference an existing table
# snowflake usually needs a schema, so we
# specify it via the AssetKey as ["schema", "table"]
# We tell dagster that this table is partitioned by day
orders = SourceAsset(
key = AssetKey(["ANALYTICS", "orders_cleaned"]),
partitions_def=DailyPartitionsDefinition(start_date="2022-11-15"),
metadata= {"partition_expr": "ORDER_DATE"}
)
# Downstream assets should use AssetIn to refer to
# the SourceAsset by key
# The downstream asset will need to be written to a schema
# specified here by key_prefix
# The SourceAsset is loaded by default as Select *
# - certain columns can be selected using asset metadata
# - WHERE clauses are modeled via partitions
@asset(
ins = {"orders": AssetIn(
key = AssetKey(["ANALYTICS", "orders_cleaned"]),
# limit the columns pulled from the upstream asset
metadata={"columns": ["ORDER_DATE", "PURCHASE_PRICE", "USER_ID"]}
)},
key_prefix = "ANALYTICS",
metadata={"partition_expr": "ORDER_DATE"},
partitions_def=DailyPartitionsDefinition(start_date="2022-11-15")
)
def top_orders(orders: pd.DataFrame):
# the rows pulled from the upstream asset will match the asset partition
# e.g. if I materialize the top_orders asset for 2022-11-15,
# the orders data frame will be built using
# SELECT ORDER_DATE, PURCHASE_PRICE, USER_ID FROM orders WHERE ORDER_DATE = 2022-11-15
top_orders = orders[:10]
# Result: a 10 row table RAW_DATA.top_orders is created
return top_orders
# Job asset selections should target the dagster assets not
# source assets
top_orders_job = define_asset_job(
name = "top_orders_job",
selection = AssetSelection.keys(AssetKey(["ANALYTICS", "top_orders"]))
)
snowflake = build_snowflake_io_manager([SnowflakePandasTypeHandler()])
@repository
def my_repo():
return [
with_resources(
[orders, top_orders],
{
# snowflake config specified through environment variables
# treated as secrets using {"env": "MY_SECRET_ENV_VAR"}
"io_manager": snowflake.configured({
"database": "DEMO_DB2_BRANCH",
"account": {"env": "SNOWFLAKE_ACCOUNT"},
"user": {"env": "SNOWFLAKE_USER"},
"password": {"env": "SNOWFLAKE_PASSWORD"},
"warehouse": "TINY_WAREHOUSE"
})
}
) + [top_orders_job]
]
@slopp
Copy link
Author

slopp commented Nov 28, 2022

The result:

Screen Shot 2022-11-28 at 11 55 14 AM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment