Skip to content

Instantly share code, notes, and snippets.

@slopp
Created November 2, 2022 19:52
Show Gist options
  • Save slopp/b1005b9351c35256b658998d1bdbb25a to your computer and use it in GitHub Desktop.
Save slopp/b1005b9351c35256b658998d1bdbb25a to your computer and use it in GitHub Desktop.
Assets vs Ops
# repository.py
from dagster import job, ScheduleDefinition, repository, with_resources
from dagster_fivetran import fivetran_resource, fivetran_sync_op
from dagster_dbt import dbt_cloud_resource, dbt_cloud_run_op
my_fivetran_resource = fivetran_resource.configured(
{
"api_key": {"env": "FIVETRAN_API_KEY"},
"api_secret": {"env": "FIVETRAN_API_SECRET"},
}
)
my_dbt_cloud_resource = dbt_cloud_resource.configured(
{
"auth_token": {"env": "DBT_CLOUD_AUTH_TOKEN"},
"account_id": 11111
}
)
fivetran_assets = build_fivetran_assets(
connector_id="sfdc",
table_names=["sfdc.accounts", "sfdc.opps"],
)
dbt_cloud_assets = load_assets_from_dbt_cloud_manifest(my_dbt_cloud_resource)
# Asset dependencies are inferred based on the fivetran tables, dbt sources.yml, and dbt models
# No need to specify them again in dagster
# DBT Project:
# sources.yml
# - name: sfdc
# tables:
# - name: accounts
# - name: opps
#
# models /
# sfdc /
# total_opps.sql
total_opps_job = define_asset_job(name="total_opps_job", selection = AssetSelection.keys(["sfdc", "total_opps"]).upstream())
run_nightly = ScheduleDefinition(job=total_opps_job, cron_schedule="@daily")
@repository
def my_repo():
return [
with_resources(fivetran_assets + dbt_cloud_assets,
resource_defs = {
"dbt_cloud": my_dbt_cloud_resource,
"fivetran": my_fivetran_resource
}
),
[run_nightly]
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment