Skip to content

Instantly share code, notes, and snippets.

@jeremyyeo
Last active September 9, 2024 17:53
Show Gist options
  • Save jeremyyeo/3a23f3fbcb72f10a17fc4d31b8a47854 to your computer and use it in GitHub Desktop.
Save jeremyyeo/3a23f3fbcb72f10a17fc4d31b8a47854 to your computer and use it in GitHub Desktop.
Building SCD-2 models using the default incremental materialization #dbt

Building SCD-2 models using the default incremental materialization

  1. Bootstrap the source / snapshot table source_users and the initial state of our scd2 table dim_users by running:
dbt run -m source_users dim_users
  1. Copy over contents of dim_users_inc.sql replacing contents in dim_users.sql (or just rename them). Then run:
dbt run -m dim_users

Demo

2021-11-09 21 24 26

-- We preload this dim_users SCD2 table with some initial data so we can
-- incremental over it later.
with source_data as (
select 1 as user_id, current_timestamp as updated_at, 'pending' as status union all
select 1, dateadd(minute, 10, current_timestamp), 'active' union all
select 2, dateadd(minute, 20, current_timestamp), 'pending'
)
select user_id
, updated_at
, status
, updated_at as dbt_valid_from
, lead(updated_at, 1) over (partition by user_id order by updated_at) as dbt_valid_to
from source_data
order by 1, 2
-- Copy this into `dim_users.sql` after initial bootstrapping.
{{
config(
materialized='incremental',
unique_key='md5(concat(user_id, updated_at))',
incremental_strategy='delete+insert'
)
}}
with source_rows as (
select *
, updated_at as dbt_valid_from
, null as dbt_valid_to
, md5(concat(user_id, updated_at)) as dbt__scd_id
from {{ ref('source_users') }}
)
{% if is_incremental() %}
, destination_rows as (
select *
, md5(concat(user_id, updated_at)) as dbt__scd_id
from {{ this }}
where dbt_valid_to is null
)
, new_valid_to as (
select d.user_id
, s.dbt_valid_from as dbt_valid_to
from source_rows s
join destination_rows d
on s.user_id = d.user_id
and s.dbt__scd_id != d.dbt__scd_id
)
, add_new_valid_to as (
select d.user_id
, d.updated_at
, d.status
, d.dbt_valid_from
, n.dbt_valid_to
from destination_rows d
left join new_valid_to n
on d.user_id = n.user_id
)
select n.user_id
, n.updated_at
, n.status
, n.dbt_valid_from
, n.dbt_valid_to
from add_new_valid_to n
union
{% endif %}
select s.user_id
, s.updated_at
, s.status
, s.dbt_valid_from
, lead(s.updated_at, 1) over (partition by s.user_id order by s.updated_at) as dbt_valid_to
from source_rows s
with source_data as (
select 1 as user_id, dateadd(minute, 30, current_timestamp) as updated_at, 'deleted' as status union all
select 2, dateadd(minute, 40, current_timestamp), 'active' union all
select 3, dateadd(minute, 50, current_timestamp), 'pending'
)
select * from source_data
@adinsmoor
Copy link

Just a heads up, I believe the "new_valid_to" CTE should be modified to pick up a single dbt_valid_to that's the minimum for a given user_id's new records

This allows for handling multiple updates to a single user_id during an incremental run as well

select d.digipass_id , min(s.dbt_valid_from) as dbt_valid_to from source_rows s join destination_rows d on s.digipass_id = d.digipass_id and s.dbt__scd_id != d.dbt__scd_id group by 1

@DiegoWeichafe
Copy link

Just a heads up, I believe the "new_valid_to" CTE should be modified to pick up a single dbt_valid_to that's the minimum for a given user_id's new records

This allows for handling multiple updates to a single user_id during an incremental run as well

select d.digipass_id , min(s.dbt_valid_from) as dbt_valid_to from source_rows s join destination_rows d on s.digipass_id = d.digipass_id and s.dbt__scd_id != d.dbt__scd_id group by 1

, new_valid_to as (

    select d.user_id
           ,min(s.dbt_valid_from) as dbt_valid_to
      from source_rows s
      join destination_rows d
        on s.user_id = d.user_id
       and s.dbt__scd_id != d.dbt__scd_id
      group by 1
)

@Victoriapm
Copy link

@jeremyyeo one think I also like to add in this setup is a config on the incremental model to avoid the full_refresh, as with snapshots we are looking to store history and we want to avoid dropping those tables by mistake.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment