Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save vvgsrk/93b022e54ebff602ead8411637077510 to your computer and use it in GitHub Desktop.
Save vvgsrk/93b022e54ebff602ead8411637077510 to your computer and use it in GitHub Desktop.
Compare row rank query result with incremental table
{#
- This macro generate hash value to compare row rank and incremental data
#}
{%- macro compare_row_rank_query_result_with_incremental_table(history_model, incremental_model) -%}
{%- set history_columns = adapter.get_columns_in_relation(history_model) -%}
{%- set inc_columns = adapter.get_columns_in_relation(incremental_model) -%}
WITH ranked_rows AS
(
SELECT
ROW_NUMBER() OVER (
PARTITION BY {% for column in history_columns if column.name.lower().endswith('_id') -%}
"{{ column.name }}" {% if not loop.last -%},
{%- endif %}
{%- endfor %}
ORDER BY __event_ts DESC
) AS row_rank,
*
FROM {{ history_model }}
WHERE __load_ts <= (SELECT MAX(__load_ts)
FROM {{ incremental_model }})
),
row_rank_as_one AS
(
SELECT HASH_AGG(*) row_rank_hash_agg_value
FROM(
SELECT {% for column in history_columns -%}
"{{ column.name }}" {% if not loop.last -%},
{%- endif %}
{%- endfor %}
FROM ranked_rows
WHERE row_rank = 1
)
),
incremental_rows AS (
SELECT HASH_AGG(*) incremental_hash_agg_value
FROM (
SELECT
{% for column in inc_columns if not column.name.startswith('__UUID') -%}
"{{ column.name }}" {% if not loop.last -%},
{%- endif %}
{%- endfor %}
FROM {{ incremental_model }}
)
),
final as (
SELECT
row_rank_hash_agg_value,
incremental_hash_agg_value,
CASE WHEN row_rank_hash_agg_value = incremental_hash_agg_value THEN 0 ELSE 1 END result
FROM row_rank_as_one
CROSS JOIN incremental_rows
)
SELECT *
FROM final
{% endmacro %}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment