Skip to content

Instantly share code, notes, and snippets.

@slopp
Last active November 8, 2022 19:10
Show Gist options
  • Save slopp/68ccae544fd1c712f9d4fc7b581df4f1 to your computer and use it in GitHub Desktop.
Save slopp/68ccae544fd1c712f9d4fc7b581df4f1 to your computer and use it in GitHub Desktop.
Modern Data Stack Ops vs Assets
# repository.py
from dagster import job, ScheduleDefinition, repository, with_resources
from dagster_fivetran import fivetran_resource, build_fivetran_assets
from dagster_dbt import dbt_cloud_resource, load_assets_from_dbt_cloud_manifest
my_fivetran_resource = fivetran_resource.configured(
{
"api_key": {"env": "FIVETRAN_API_KEY"},
"api_secret": {"env": "FIVETRAN_API_SECRET"},
}
)
my_dbt_cloud_resource = dbt_cloud_resource.configured(
{
"auth_token": {"env": "DBT_CLOUD_AUTH_TOKEN"},
"account_id": 11111
}
)
fivetran_assets = build_fivetran_assets(
connector_id="sfdc",
table_names=["sfdc.accounts", "sfdc.opps"],
)
# Currently Pseudo-code, release planned for mid-November
dbt_cloud_assets = load_assets_from_dbt_cloud_job(my_dbt_cloud_resource, job_id = 33333)
# Asset dependencies are inferred based on the fivetran tables, dbt sources.yml, and dbt models
# No need to specify them again in dagster
# DBT Project:
# sources.yml
# - name: sfdc
# tables:
# - name: accounts
# - name: opps
#
# models /
# sfdc /
# total_opps.sql
total_opps_job = define_asset_job(name="total_opps_job", selection = AssetSelection.keys(["sfdc", "total_opps"]).upstream())
run_nightly = ScheduleDefinition(job=total_opps_job, cron_schedule="@daily")
@repository
def my_repo():
return [
with_resources(fivetran_assets + dbt_cloud_assets,
resource_defs = {
"dbt_cloud": my_dbt_cloud_resource,
"fivetran": my_fivetran_resource
}
),
[run_nightly]
]

This gist shows the difference between an op-forward view of the modern data stack and an asset-forward view.

Assets

  • Contain rich metadata
  • Easily see out-of-sync data and stale tables
  • Think in the same vocabulary as your stakeholders
  • Re-run downstream assets without re-building upstream data
  • Schedulable, cron (daily, monthly), events (via sensors), or based on freshness SLAs (soon) assets

Ops

  • Still let you synchronize the modern data stack (e.g. run fivetran than dbt) so you aren't just guessing when thigns should run
  • But without the benefits of assets ops
# repository.py
from dagster import job, ScheduleDefinition, repository
from dagster_fivetran import fivetran_resource, fivetran_sync_op
from dagster_dbt import dbt_cloud_resource, dbt_cloud_run_op
my_fivetran_resource = fivetran_resource.configured(
{
"api_key": {"env": "FIVETRAN_API_KEY"},
"api_secret": {"env": "FIVETRAN_API_SECRET"},
}
)
my_dbt_cloud_resource = dbt_cloud_resource.configured(
{
"auth_token": {"env": "DBT_CLOUD_AUTH_TOKEN"},
"account_id": 11111
}
)
sync_salesforce = fivetran_sync_op.configured({"connector_id": "salesforce"}, name="sync_salesforce")
sync_dbt = dbt_cloud_run_op.configured({"job_id": 33333}, name="sync_dbt")
@job(
resource_defs =
{
"dbt_cloud": my_dbt_cloud_resource,
"fivetran": my_fivetran_resource
}
)
def run_fivetran_dbt():
sync_dbt(start_after=sync_salesforce())
run_nightly = ScheduleDefinition(job=run_fivetran_dbt, cron_schedule="@daily")
@repository
def my_repo():
return [run_nightly]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment