Skip to content

Instantly share code, notes, and snippets.

@gnilrets
Last active October 27, 2024 12:02
Show Gist options
  • Save gnilrets/48886b4c8945dde1da13547c2373df73 to your computer and use it in GitHub Desktop.
Save gnilrets/48886b4c8945dde1da13547c2373df73 to your computer and use it in GitHub Desktop.
dbt Snapshot Join
{% macro trange_join(left_model, left_fields, left_primary_key, right_models) %}
{#
This macro allows the user to join two or more snapshot models together on a common
key, with the result being a unique record for each distinct time range. For example,
Given left_model:
| {{ join_key }} | left_field | dbt_valid_from | dbt_valid_to |
| - | - | - | - |
| k1 | L1 | 2020-01-01 | 2020-01-05 |
| k1 | L2 | 2020-01-05 | 2999-12-31 |
Given right_model:
| {{ join_key }} | right_field | dbt_valid_from | dbt_valid_to |
| - | - | - | - |
| k1 | R1 | 2020-01-03 | 2020-01-07 |
| k1 | R2 | 2020-01-07 | 2999-12-31 |
Resultant temporal range join:
| {{ join_key }} | left_field | right_field | dbt_valid_from | dbt_valid_to |
| - | - | - | - | - |
| k1 | L1 | | 2020-01-01 | 2020-01-03 |
| k1 | L1 | R1 | 2020-01-03 | 2020-01-05 |
| k1 | L2 | R1 | 2020-01-05 | 2020-01-07 |
| k1 | L2 | R2 | 2020-01-07 | 2999-12-31 |
Parameters:
* left_model - Name of the "left" or primary model involved in the join. This needs
to be the most granular table involved in the join (the "one" in "one-to-many").
* left_fields - An array of the fields on the left model to be included in the result.
* left_primary_key - The primary key of the left model. Note that this is the primary
key of the source data, not the snapshot data (so it may not be unique in the snapshot, but
must be unique at any point in time).
* right_models - A dictionary where the keys are the names of "right" models and the values
are another dictionary containing a list of the fields to be included in the final
table and the join keys (see example).
Example:
trange_join(
left_model='engagements',
left_fields=engagement_fields|map('last')|list,
left_primary_key='engagement_sfid', # Granularity of the pre-snapshot enagement table is `engagement_sfid`
right_models={
'partners': {
'fields': partner_fields|map('last')|list,
'left_on': 'partner_sfid',
'right_on': 'partner_sfid'
}
}
)
Requirements and assumptions:
* Models must be CTEs, or in the default search path.
* Models must have all of the following fields defined, and they must all be non-null:
- dbt_scd_id - unique row identifier
- dbt_valid_from
- dbt_valid_to (nulls must replaced with an open ended date like "2999-12-31 00:00:00",
and that date must be stored in a dbt var accessible via `var('OPEN_END_DATE')`)
* All other fields must not be shared between the models involved.
Rename any shared names (other than join keys) prior to using this macro.
* The final result is a CTE named `trange_final`. Select from this CTE.
* The final result includes a `surrogate_key` field that combines the left_primary_key
with the valid_from_at that may be used as a surrogate key in a dimensional model.
Reference: https://www.oraylis.de/blog/combining-multiple-tables-with-valid-from-to-date-ranges-into-a-single-dimension
#}
{%- for right_model, right_params in right_models.items() %}
trange_unique_left_{{ right_model }} AS (
SELECT DISTINCT
{{ left_primary_key }},
{{ right_params['left_on'] }} AS __left_join_key
FROM
{{ left_model }}
),
-- The "many" side is exploded so that it ends up as a one-to-one join
-- This prevents "ghost" rows from resulting from interactions between left records sharing in the join
trange_explode_{{ right_model }} AS (
SELECT
{%- if left_primary_key != right_params['right_on'] %}
left_model.{{ left_primary_key }},
{%- endif %}
right_model.*
FROM
trange_unique_left_{{ right_model }} AS left_model
INNER JOIN
{{ right_model }} AS right_model
ON
left_model.__left_join_key = right_model.{{ right_params['right_on'] }}
),
{%- endfor %}
trange_valid_dates AS (
SELECT {{ left_primary_key }}, dbt_valid_from AS valid_at FROM {{ left_model }}
UNION
SELECT {{ left_primary_key }}, dbt_valid_to AS valid_at FROM {{ left_model }}
UNION
{%- for right_model, _ in right_models.items() %}
SELECT {{ left_primary_key }}, dbt_valid_from AS valid_at FROM trange_explode_{{ right_model }}
UNION
SELECT {{ left_primary_key }}, dbt_valid_to AS valid_at FROM trange_explode_{{ right_model }}
{{ 'UNION' if not loop.last }}
{%- endfor %}
),
trange_all_ranges AS (
SELECT
{{ left_primary_key }},
valid_at AS valid_from_at,
LEAD(valid_at, 1) OVER (PARTITION BY {{ left_primary_key }} ORDER BY valid_at) AS valid_to_at
FROM
trange_valid_dates
),
trange_joined_ranges AS (
SELECT
left_model.{{ left_primary_key }},
left_model.dbt_scd_id AS {{ left_model }}_scd_id,
{%- for right_model, _ in right_models.items() %}
trange_explode_{{ right_model }}.dbt_scd_id AS {{ right_model }}_scd_id,
{%- endfor %}
trange_all_ranges.valid_from_at,
trange_all_ranges.valid_to_at
FROM
{{ left_model }} AS left_model
INNER JOIN
trange_all_ranges
ON
left_model.{{ left_primary_key }} = trange_all_ranges.{{ left_primary_key }}
AND left_model.dbt_valid_from < trange_all_ranges.valid_to_at AND left_model.dbt_valid_to > trange_all_ranges.valid_from_at
{%- for right_model, right_params in right_models.items() %}
LEFT JOIN
trange_explode_{{ right_model }}
ON
left_model.{{ left_primary_key }} = trange_explode_{{ right_model }}.{{ left_primary_key }}
AND left_model.{{ right_params['left_on'] }} = trange_explode_{{ right_model }}.{{ right_params['right_on'] }}
AND trange_explode_{{ right_model }}.dbt_valid_from < trange_all_ranges.valid_to_at AND trange_explode_{{ right_model }}.dbt_valid_to > trange_all_ranges.valid_from_at
{%- endfor %}
),
trange_final AS (
SELECT
{%- for left_field in left_fields %}
left_model.{{ left_field }},
{%- endfor %}
{%- for right_model, right_params in right_models.items() %}
{%- for right_field in right_params['fields'] if (right_field != right_params['left_on'] or right_params['left_on'] != right_params['right_on']) %}
{{ right_model }}.{{ right_field }},
{%- endfor %}
{%- endfor %}
trange_joined_ranges.{{ left_model }}_scd_id,
{%- for right_model, _ in right_models.items() %}
{{ right_model }}.dbt_scd_id AS {{ right_model }}_scd_id,
{%- endfor %}
{{ dbt_utils.surrogate_key([
'left_model.' ~ left_primary_key,
'trange_joined_ranges.valid_from_at',
]) }} AS surrogate_key,
trange_joined_ranges.valid_from_at,
trange_joined_ranges.valid_to_at,
trange_joined_ranges.valid_to_at = {{ var('OPEN_END_DATE') }} AS is_valid
FROM
trange_joined_ranges
LEFT JOIN
{{ left_model }} AS left_model
ON
trange_joined_ranges.{{ left_model }}_scd_id = left_model.dbt_scd_id
{%- for right_model, _ in right_models.items() %}
LEFT JOIN
{{ right_model }}
ON
trange_joined_ranges.{{ right_model }}_scd_id = {{ right_model }}.dbt_scd_id
{%- endfor %}
)
{% endmacro %}
@gnilrets
Copy link
Author

gnilrets commented Feb 9, 2022

Copyright 2022, Sterling Paramore

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

@cnolanminich
Copy link

Thank you so much!

@gnilrets
Copy link
Author

Updated this macro to fix an issue where there is a foreign key change for a particular record. Thanks to Brian Schillaci!

@dongchris
Copy link

@gnilrets hi would this join macro work as an incremental strategy that runs daily based on start and end date?

@gnilrets
Copy link
Author

@dongchris - I haven't tried, but I don't see why it couldn't work.

@wesseljt
Copy link

@gnilrets - really great work. Thanks!

@aking-ed
Copy link

This is great. We were trying to solve this same problem

@gabriellegall
Copy link

gabriellegall commented Oct 20, 2024

Awesome work !! I integrated this into my project and it works perfectly !
Am I right to say that this macro only works when the FKs are on the LEFT model ? For instance if you have your dimensions organized in a snowflake structure. You'd need to use the macro several times in a nested manner, right ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment