dbt has a function to check the freshness (i.e. how up to date) of a particular source. That functionality is reserved exclusively for sources and not models - i.e. if you tried to apply source freshness on a model like you would a source:
# models/schemas.yml
version: 2
models:
- name: foo
loaded_at_field: updated_at
freshness:
error_after: { count: 48, period: hour }
...
You will find that neither dbt source freshness
nor dbt test
will actually do any "freshness checking" of our foo
model above. One of the motivations for that is because in any dbt project, raw data normally will come in as sources - and then in your models you start transforming the data by selecting from those raw data sources - essentially if your source data is fresh - there is really no reason why you'd want to "freshness check" your models as well. Of course, given that dbt is basically just a jinja + sql compiler - the sky is the limit to what users usually attempt to do with it.
Now, while the built in source freshness checks are for sources only, dbt does however does provide the flexibility to an end user to create their own generic tests that they can then apply to models. Let's see if we can create a generic test that closely achieves the outcome of what source freshness checks do.
Since this is a more advanced topic - there's a couple of key things to understand here.
Below, I will be using "fail" and "error" interchangeably. This is opposed to "warn" which is different from "fail"/"error".
- By default, a dbt test will result in a fail if the test selection includes at least 1 record.
- Also by default, a dbt test will not emit a warning - it is always fail or not fail (i.e. pass).
- It is possible to configure the severity of a generic test - such that a test emits a warning if there are X records and fails / errors if there are Y records (https://docs.getdbt.com/reference/resource-configs/severity).
- Critically, it is important to understand that a normal generic test (the built in ones like
not_null
,unique
, etc or one that a user writes themselves) is testing for the "number of rows" in order to emit a certain severity (fail/warn/pass). This is not the same as for the out of the box source freshness checks where it is not so much concerned with the "number of rows" but rather checking that the max timestamp of a certain column exceeds some error or warning threshold. All this is to say, the code paths that "decides" the status of a generic test is completely different to the code paths that "decides" the status of source freshness check (these are after all, not a "test" but it's completely own thing). - To replicate a source freshness check - our generic test will need to support 3 types of freshness configs:
- Only
warn_after
is configured. - Only
error_after
is configured. - Both
warn_after
anderror_after
is configured.
- Only
For the purposes of our replication, we will set it up so that:
- The model freshness checks will fail if the timestamp exceeds 48 hours.
- The model freshness checks will warn if the timestamp exceeds 12 hours.
- The model freshess checks will pass otherwise.
This is tested on Snowflake only.
First, let's setup our dbt project.
# dbt_project.yml
name: my_dbt_project
profile: all
config-version: 2
version: "1.0.0"
models:
my_dbt_project:
+materialized: table
And these are the 7 models that we will be using to test all our conditions and combinations of conditions:
-- models/bar_should_fail_with_error_config.sql
select current_timestamp() - interval '60 hour' as updated_at, 1 as id
-- models/bar_should_pass_with_error_config.sql
select current_timestamp() - interval '36 hour' as updated_at, 1 as id
-- models/bar_should_pass_with_warn_config.sql
select current_timestamp() as updated_at, 1 as id
-- models/bar_should_warn_with_warn_config.sql
select current_timestamp() - interval '36 hour' as updated_at, 1 as id
-- models/foo_should_fail.sql
select current_timestamp() - interval '60 hour' as updated_at, 1 as id
-- models/foo_should_pass.sql
select current_timestamp() as updated_at, 1 as id
-- models/foo_should_warn.sql
select current_timestamp() - interval '36 hour' as updated_at, 1 as id
And the associated schema yml file where we will be applying our tests to:
# models/schemas.yml
version: 2
models:
- name: bar_should_fail_with_error_config
tests:
- freshness:
loaded_at_field: updated_at
error_after: { count: 48, period: hour }
- name: bar_should_pass_with_error_config
tests:
- freshness:
loaded_at_field: updated_at
error_after: { count: 48, period: hour }
- name: bar_should_pass_with_warn_config
tests:
- freshness:
loaded_at_field: updated_at
warn_after: { count: 12, period: hour }
- name: bar_should_warn_with_warn_config
tests:
- freshness:
loaded_at_field: updated_at
warn_after: { count: 12, period: hour }
- name: foo_should_fail
tests: &freshness_tests
- freshness:
loaded_at_field: updated_at
warn_after: { count: 12, period: hour }
error_after: { count: 48, period: hour }
- name: foo_should_pass
tests: *freshness_tests
- name: foo_should_warn
tests: *freshness_tests
The tests on
foo_
models make use of yaml anchors to copy the test from one model to the next without having to rewrite them.
If you pay close attention to the above model code + schema yaml test declarations - you'll easily identify which model tests are supposed to result in which test status (fail/warn/pass). For example, the freshness test on the model bar_should_fail_with_error_config
should error because the updated_at
column is created with current_timestamp() - interval '60 hour'
which exceeds the error_after: { count: 48, period: hour }
config.
Now, here comes the "magic", the generic test macro itself:
-- tests/generic/freshness.sql
{% test freshness(model, loaded_at_field, warn_after=none, error_after=none) %}
{% if warn_after and error_after %}
{{ config(warn_if='=1', error_if='=2') }}
with check_conditions as (
select count(*) as num_rows, max({{loaded_at_field}}) as last_loaded_at, 'warn' as condition
from {{ model }}
where {{ loaded_at_field }} < (current_timestamp() - interval '{{ warn_after.count }} {{ warn_after.period }}')
union
select count(*) as num_rows, max({{loaded_at_field}}) as last_loaded_at, 'error' as condition
from {{ model }}
where {{ loaded_at_field }} < (current_timestamp() - interval '{{ error_after.count }} {{ error_after.period }}')
)
{% elif warn_after %}
{{ config(warn_if='=1', error_if='=2') }}
with check_conditions as (
select count(*) as num_rows, max({{loaded_at_field}}) as last_loaded_at, 'warn' as condition
from {{ model }}
where {{ loaded_at_field }} < (current_timestamp() - interval '{{ warn_after.count }} {{ warn_after.period }}')
)
{% elif error_after %}
{{ config(error_if='=1') }}
with check_conditions as (
select count(*) as num_rows, max({{loaded_at_field}}) as last_loaded_at, 'error' as condition
from {{ model }}
where {{ loaded_at_field }} < (current_timestamp() - interval '{{ error_after.count }} {{ error_after.period }}')
)
{% else %}
{% do exceptions.raise_compiler_error('Freshness test applied to model ' ~ model ~ 'but missing warn_after/error_after configs.') %}
{% endif %}
select * from check_conditions
where num_rows > 0
{% endtest %}
The general idea here is that, for test that have both warn_after
and error_after
configured, if a timestamp column exceeds the error_after
config - then it surely should also exceed any warn_after
configs. This means that:
- If the test exceeds the
error_after
threshold - our test selection returns 2 rows. - If the test exceeds only
warn_after
threshold - our test selection returns 1 row. - And 0 rows otherwise.
Note that we make use of the warn_if
/ error_if
test configurations as well in order to emit the right status (fail/warn/pass).
Let's now build our models and run our tests and see the outcome.
$ dbt build
09:59:51 Running with dbt=1.5.8
09:59:52 Registered adapter: snowflake=1.5.5
09:59:52 Found 7 models, 7 tests, 0 snapshots, 0 analyses, 326 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
09:59:52
09:59:56 Concurrency: 1 threads (target='sf')
09:59:56
09:59:56 1 of 14 START sql table model dbt_jyeo.bar_should_fail_with_error_config ....... [RUN]
09:59:58 1 of 14 OK created sql table model dbt_jyeo.bar_should_fail_with_error_config .. [SUCCESS 1 in 2.50s]
09:59:58 2 of 14 START sql table model dbt_jyeo.bar_should_pass_with_error_config ....... [RUN]
10:00:00 2 of 14 OK created sql table model dbt_jyeo.bar_should_pass_with_error_config .. [SUCCESS 1 in 2.00s]
10:00:00 3 of 14 START sql table model dbt_jyeo.bar_should_pass_with_warn_config ........ [RUN]
10:00:02 3 of 14 OK created sql table model dbt_jyeo.bar_should_pass_with_warn_config ... [SUCCESS 1 in 1.79s]
10:00:02 4 of 14 START sql table model dbt_jyeo.bar_should_warn_with_warn_config ........ [RUN]
10:00:04 4 of 14 OK created sql table model dbt_jyeo.bar_should_warn_with_warn_config ... [SUCCESS 1 in 2.18s]
10:00:04 5 of 14 START sql table model dbt_jyeo.foo_should_fail ......................... [RUN]
10:00:06 5 of 14 OK created sql table model dbt_jyeo.foo_should_fail .................... [SUCCESS 1 in 1.88s]
10:00:06 6 of 14 START sql table model dbt_jyeo.foo_should_pass ......................... [RUN]
10:00:08 6 of 14 OK created sql table model dbt_jyeo.foo_should_pass .................... [SUCCESS 1 in 2.08s]
10:00:08 7 of 14 START sql table model dbt_jyeo.foo_should_warn ......................... [RUN]
10:00:10 7 of 14 OK created sql table model dbt_jyeo.foo_should_warn .................... [SUCCESS 1 in 1.81s]
10:00:10 8 of 14 START test freshness_bar_should_fail_with_error_config_48__hour__updated_at [RUN]
10:00:12 8 of 14 FAIL 1 freshness_bar_should_fail_with_error_config_48__hour__updated_at [FAIL 1 in 1.65s]
10:00:12 9 of 14 START test freshness_bar_should_pass_with_error_config_48__hour__updated_at [RUN]
10:00:13 9 of 14 PASS freshness_bar_should_pass_with_error_config_48__hour__updated_at .. [PASS in 1.59s]
10:00:13 10 of 14 START test freshness_bar_should_pass_with_warn_config_updated_at__12__hour [RUN]
10:00:15 10 of 14 PASS freshness_bar_should_pass_with_warn_config_updated_at__12__hour .. [PASS in 1.45s]
10:00:15 11 of 14 START test freshness_bar_should_warn_with_warn_config_updated_at__12__hour [RUN]
10:00:16 11 of 14 WARN 1 freshness_bar_should_warn_with_warn_config_updated_at__12__hour [WARN 1 in 1.56s]
10:00:16 12 of 14 START test freshness_foo_should_fail_48__hour__updated_at__12__hour ... [RUN]
10:00:18 12 of 14 FAIL 2 freshness_foo_should_fail_48__hour__updated_at__12__hour ....... [FAIL 2 in 1.51s]
10:00:18 13 of 14 START test freshness_foo_should_pass_48__hour__updated_at__12__hour ... [RUN]
10:00:20 13 of 14 PASS freshness_foo_should_pass_48__hour__updated_at__12__hour ......... [PASS in 1.59s]
10:00:20 14 of 14 START test freshness_foo_should_warn_48__hour__updated_at__12__hour ... [RUN]
10:00:21 14 of 14 WARN 1 freshness_foo_should_warn_48__hour__updated_at__12__hour ....... [WARN 1 in 1.62s]
10:00:21
10:00:21 Finished running 7 table models, 7 tests in 0 hours 0 minutes and 28.64 seconds (28.64s).
10:00:21
10:00:21 Completed with 2 errors and 2 warnings:
10:00:21
10:00:21 Failure in test freshness_bar_should_fail_with_error_config_48__hour__updated_at (models/schemas.yml)
10:00:21 Got 1 result, configured to fail if =1
10:00:21
10:00:21 compiled Code at target/compiled/my_dbt_project/models/schemas.yml/freshness_bar_should_fail_with_186fbb3725139ea8c0ad8614bc48f256.sql
10:00:21
10:00:21 Failure in test freshness_foo_should_fail_48__hour__updated_at__12__hour (models/schemas.yml)
10:00:21 Got 2 results, configured to fail if =2
10:00:21
10:00:21 compiled Code at target/compiled/my_dbt_project/models/schemas.yml/freshness_foo_should_fail_48__hour__updated_at__12__hour.sql
10:00:21
10:00:21 Warning in test freshness_bar_should_warn_with_warn_config_updated_at__12__hour (models/schemas.yml)
10:00:21 Got 1 result, configured to warn if =1
10:00:21
10:00:21 compiled Code at target/compiled/my_dbt_project/models/schemas.yml/freshness_bar_should_warn_with_warn_config_updated_at__12__hour.sql
10:00:21
10:00:21 Warning in test freshness_foo_should_warn_48__hour__updated_at__12__hour (models/schemas.yml)
10:00:21 Got 1 result, configured to warn if =1
10:00:21
10:00:21 compiled Code at target/compiled/my_dbt_project/models/schemas.yml/freshness_foo_should_warn_48__hour__updated_at__12__hour.sql
10:00:21
10:00:21 Done. PASS=10 WARN=2 ERROR=2 SKIP=0 TOTAL=14
As we can see:
- Any test that we wanted to have failed, has failed.
- Any test that we wanted to have warned, has warned.
- And any test that we wanted to have passed, has passed.
Thanks for posting this though btw - very helpful for applying source freshness on Snowflake stage sources for S3 - couldn't seem to get dbt source freshness command to work for this.