Created
March 6, 2025 15:54
-
-
Save Taytay/dabe94d987401280a7863daeb98fa949 to your computer and use it in GitHub Desktop.
Dagster dbt utilities
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from typing import AbstractSet, Any, Generator, Mapping, Sequence, ValuesView | |
import dagster as dg | |
from dagster import ( | |
AssetCheckResult, | |
AssetExecutionContext, | |
AssetKey, | |
AssetMaterialization, | |
AssetObservation, | |
Config, | |
Output, | |
SourceAsset, | |
) | |
from dagster_dbt import DagsterDbtTranslator, DbtCliInvocation, DbtCliResource, get_asset_key_for_model | |
from dagster_dbt.asset_utils import get_manifest_and_translator_from_dbt_assets | |
DbtUntypedManifest = Mapping[str, Any] | |
DbtUntypedSource = Mapping[str, Any] | |
def get_dbt_sources_that_are_depended_upon(dbt_assets: dg.AssetsDefinition) -> Mapping[dg.AssetKey, DbtUntypedSource]: | |
"""Get all dbt sources that are depended upon by something in the dbt manifest. | |
This is helpful if you have more sources defined in dbt than you actually use, and you only want to see | |
assets in your DAG that are actually used. | |
Args: | |
dbt_assets: The AssetsDefinition returned from @dbt_assets | |
Returns: | |
Set[AssetKey]: The set of dbt source asset keys that are depended upon by something in the manifest | |
""" | |
manifest: DbtUntypedManifest | |
dagster_dbt_translator: DagsterDbtTranslator | |
manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets(dbt_assets=[dbt_assets]) | |
if not manifest or not dagster_dbt_translator: | |
raise ValueError("Not a valid dbt assets definition. Couldn't get manifest and translator.") | |
# set_of_dbt_sources_that_are_depended_upon: set[dg.AssetKey] = set() | |
asset_key_to_source_object: Mapping[dg.AssetKey, DbtUntypedSource] = {} | |
# manifest_sources: list[DbtTypedSource] | Any = manifest["sources"].values() | |
manifest_sources: list[DbtUntypedSource] | Any = manifest["sources"].values() | |
# Look through all sources in the manifest | |
if not manifest_sources: | |
return asset_key_to_source_object | |
# this is a list of sets of AssetKeys | |
all_distinct_sets_of_keys_being_depended_on: ValuesView[AbstractSet[dg.AssetKey]] = dbt_assets.asset_deps.values() | |
# we make a single set of all the AssetKeys | |
set_of_all_keys_being_depended_on: set[dg.AssetKey] = set().union(*all_distinct_sets_of_keys_being_depended_on) | |
for dbt_source in manifest_sources: | |
asset_key_of_source: dg.AssetKey = dagster_dbt_translator.get_asset_key(dbt_resource_props=dbt_source) | |
if asset_key_of_source in set_of_all_keys_being_depended_on: | |
asset_key_to_source_object[asset_key_of_source] = dbt_source | |
return asset_key_to_source_object | |
def create_dagster_assets_from_dbt_sources_that_are_actually_used( | |
dbt_assets_definition: dg.AssetsDefinition | Sequence[dg.AssetsDefinition], | |
) -> list[SourceAsset]: | |
"""Create Dagster SourceAssets from dbt sources that are referred to in the manifest. | |
This has the added benefit of calling `get_group_name` on the source object, which is not something that | |
Dagster dbt's @dbt_assets decorator does for some reason. | |
Example translator: | |
```py | |
class MyCustomDbtTranslator(DagsterDbtTranslator): | |
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]: | |
resource_type: str = dbt_resource_props["resource_type"] | |
# Respect whatever the user has set as the group name in the dbt model, if they have set it | |
meta_group_name: str = dbt_resource_props.get("meta", {}).get("dagster", {}).get("group_name") | |
if meta_group_name: | |
return meta_group_name | |
if resource_type == "source": | |
return "my_custom_source_group_name" | |
... | |
``` | |
Args: | |
dbt_assets_definition: A single AssetsDefinition or a sequence of AssetsDefinitions | |
Returns: | |
Sequence of SourceAsset objects representing dbt sources | |
""" | |
# Convert single asset definition to a list if needed | |
dbt_assets_list: list[dg.AssetsDefinition] = ( | |
[dbt_assets_definition] | |
if isinstance(dbt_assets_definition, dg.AssetsDefinition) | |
else list(dbt_assets_definition) | |
) | |
_manifest: DbtUntypedManifest | |
dagster_dbt_translator: DagsterDbtTranslator | |
_manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets(dbt_assets_list) | |
source_assets_ret_val: list[SourceAsset] = [] | |
# Process each asset definition | |
for asset_def in dbt_assets_list: | |
dbt_sources_that_are_depended_upon: Mapping[dg.AssetKey, DbtUntypedSource] = ( | |
get_dbt_sources_that_are_depended_upon(asset_def) | |
) | |
# go through our dbt_sources_that_are_depended_upon and create SourceAssets for them | |
for asset_key, dbt_source_object in dbt_sources_that_are_depended_upon.items(): | |
dagster_asset_key_in_dbt_meta = dbt_source_object.get("meta", {}).get("dagster", {}).get("asset_key", None) | |
if dagster_asset_key_in_dbt_meta: | |
# This is a dagster asset already, so it's expected to be defined/materialized elsewhere. We'll skip this. | |
continue | |
source_metadata: Mapping[str, Any] = dagster_dbt_translator.get_metadata(dbt_source_object) | |
source_asset = SourceAsset( | |
key=asset_key, | |
group_name=dagster_dbt_translator.get_group_name(dbt_source_object), | |
metadata=source_metadata, | |
) | |
source_assets_ret_val.append(source_asset) | |
return source_assets_ret_val | |
def get_asset_key_for_dbt_source_table( | |
dbt_assets: dg.AssetsDefinition | Sequence[dg.AssetsDefinition], source_name: str, table_name: str | |
) -> AssetKey: | |
"""Returns the corresponding Dagster asset key for a dbt source for one of its tables. | |
(get_asset_key_for_source, which is built into dagster_dbt, only works for a single table) | |
Args: | |
source_name (str): The name of the dbt source. | |
table_name (str): The name of the table in the source. | |
Returns: | |
AssetKey: The corresponding Dagster asset key. | |
Examples: | |
.. code-block:: python | |
from dagster import asset | |
from dagster_dbt import dbt_assets | |
@dbt_assets(manifest=...) | |
def all_dbt_assets(): | |
... | |
@asset(key=get_asset_key_for_dbt_source_table([all_dbt_assets], "my_source", "my_table_name")) | |
def upstream_python_asset(): | |
... | |
""" | |
if isinstance(dbt_assets, dg.AssetsDefinition): | |
dbt_assets = [dbt_assets] | |
for dbt_asset_definition in dbt_assets: | |
try: | |
manifest, dagster_dbt_translator = get_manifest_and_translator_from_dbt_assets(dbt_assets) | |
matching_source_nodes = [ | |
value for value in manifest["sources"].values() if value["source_name"] == source_name | |
] | |
if len(matching_source_nodes) == 0: | |
raise KeyError(f"Could not find a dbt source with name: {source_name}") | |
matching_source_entry: list[dict[str, Any]] = [ | |
source_entry for source_entry in matching_source_nodes if source_entry["name"] == table_name | |
] | |
if len(matching_source_entry) == 0: | |
raise KeyError(f"Could not find a dbt table with name: {table_name}") | |
if len(matching_source_entry) > 1: | |
raise KeyError(f"Found multiple dbt tables with name: {table_name}") | |
return dagster_dbt_translator.get_asset_key(matching_source_entry[0]) | |
except KeyError as e: | |
# This is expected behavior since we are searching through multiple dbt assets, and the underlying functiont throws a keyerror if it isn't found | |
print( | |
f"[yellow]Could not find a dbt source table with source name: {source_name} and table name: {table_name}. Error: {e}[/yellow]" | |
) | |
continue | |
raise KeyError(f"Could not find a dbt source with name: {source_name}") | |
def add_dagster_configs_to_dbt_args(dbt_args: list[str], config: Sequence[Config]) -> list[str]: | |
if config is not None and len(config) > 0: | |
merged_config: dict[str, Any] = {} | |
for c in config: | |
merged_config.update(c.model_dump()) | |
vars_json: str = json.dumps(merged_config) | |
dbt_args.append("--vars") | |
dbt_args.append(vars_json) | |
print("Full dbt args with config: ", dbt_args) | |
return dbt_args | |
def invoke_dbt_build( | |
context: AssetExecutionContext, dbt: DbtCliResource, config: Sequence[Config] | None = None | |
) -> Generator[Output[Any] | AssetMaterialization | AssetObservation | AssetCheckResult, None, None]: | |
dbt_args: list[str] = ["build"] | |
if config: | |
add_dagster_configs_to_dbt_args(dbt_args, config) | |
dbt_build_invocation: DbtCliInvocation = dbt.cli(args=dbt_args, context=context) | |
yield from dbt_build_invocation.stream().fetch_column_metadata() | |
run_results_json: dict[str, Any] = dbt_build_invocation.get_artifact("run_results.json") | |
for result in run_results_json["results"]: | |
context.log.debug(result["compiled_code"]) | |
def get_asset_key_for_model_from_any_dbt_assets_definitions( | |
dbt_assets: dg.AssetsDefinition | Sequence[dg.AssetsDefinition], model_name: str | |
) -> dg.AssetKey: | |
"""Return the corresponding Dagster asset key for a dbt model, seed, or snapshot. | |
This is just like the one that is built into Dagster dbt, except it actually allows for multiple assets definitions to be passed in. | |
Args: | |
dbt_assets (AssetsDefinition): One or more AssetsDefinition objects produced by @dbt_assets. | |
model_name (str): The name of the dbt model, seed, or snapshot. | |
Returns: | |
AssetKey: The corresponding Dagster asset key. | |
Examples: | |
.. code-block:: python | |
from dagster import asset | |
from dagster_dbt import dbt_assets, get_asset_key_for_model | |
@dbt_assets(manifest=...) | |
def all_dbt_assets(): | |
... | |
@asset(deps={get_asset_key_for_model([all_dbt_assets], "customers")}) | |
def cleaned_customers(): | |
... | |
""" | |
if isinstance(dbt_assets, dg.AssetsDefinition): | |
dbt_assets = [dbt_assets] | |
for dbt_asset_definiton in dbt_assets: | |
try: | |
asset_key: dg.AssetKey = get_asset_key_for_model([dbt_asset_definiton], model_name) | |
if asset_key: | |
return asset_key | |
except KeyError: | |
# This is expected behavior since we are searching through multiple dbt assets, and the underlying functiont throws a keyerror if it isn't found | |
continue | |
raise KeyError(f"Could not find a dbt model, seed, or snapshot with name: {model_name} in the provided dbt assets") | |
class BaseDbtSourceModelAssetLookupHelper: | |
"""Use this to create type-safe lookups for dbt sources and models | |
For example: | |
my_source_asset = BaseDbtSourceModelAssetLookupHelper( | |
dbt_asset_definitions=[my_dbt_assets_definition], | |
source_name="my_source", | |
model_name="my_model", | |
) | |
asset_key = my_source_asset.get_asset_key() | |
""" | |
def __init__( | |
self, | |
dbt_asset_definitions: dg.AssetsDefinition | Sequence[dg.AssetsDefinition], | |
source_name: str, | |
model_name: str, | |
): | |
self.dbt_asset_definitions: dg.AssetsDefinition | Sequence[dg.AssetsDefinition] = dbt_asset_definitions | |
self.source_name: str = source_name.lower() | |
self.model_name: str = model_name.lower() | |
try: | |
# Ensure it exists! | |
_asset_key = self.get_asset_key() | |
except KeyError: | |
raise KeyError(f"Could not find dbt model {self} in the provided dbt assets") | |
def get_asset_key(self) -> dg.AssetKey: | |
return get_asset_key_for_dbt_source_table(self.dbt_asset_definitions, self.source_name, self.model_name) | |
def __str__(self) -> str: | |
return f"{self.source_name}.{self.model_name}" | |
class BaseDbtModelAssetLookupHelper: | |
"""Use this to create type-safe lookups for dbt models | |
For example: | |
my_model_asset = BaseDbtModelAssetLookupHelper( | |
asset_definitions=[my_dbt_assets_definition], | |
model_name="my_model", | |
) | |
asset_key = my_model_asset.get_asset_key() | |
""" | |
def __init__(self, asset_definitions: dg.AssetsDefinition | Sequence[dg.AssetsDefinition], model_name: str): | |
self.asset_definitions: dg.AssetsDefinition | Sequence[dg.AssetsDefinition] = asset_definitions | |
self.model_name: str = model_name.lower() | |
try: | |
# Ensure it exists! | |
_asset_key: dg.AssetKey = self.get_asset_key() | |
except KeyError: | |
raise KeyError(f"Could not find dbt model {self} in the provided dbt assets") | |
def get_asset_key(self) -> dg.AssetKey: | |
return get_asset_key_for_model_from_any_dbt_assets_definitions(self.asset_definitions, self.model_name) | |
def __str__(self) -> str: | |
return self.model_name |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment