In the context of Materialize #26881, walking through my understanding of how the proposed design would work to handle schema evolution using dbt-materialize
.
-
With the current proposal, we will automatically create a table (or multiple tables, for multi-output sources) the first time someone types
CREATE SOURCE
:-- This creates a source `kafka_src` + a table `t`, which is what you will -- query as a user. CREATE SOURCE kafka_src FROM KAFKA CONNECTION kafka_conn (TOPIC 't')
This will be the
v1
of the source table, for dbt purposes — thisv1
won't actually exist as a model in the dbt context, much like today (see dbt-core #6185). You'll pull it into the dbt context (and make itref()
-able) by declaring it as a dbt source in asources.yml
file:sources: - name: kafka_src schema: "{{ target.schema }}" tables: - name: t
It isn't possible to define contracts on dbt
sources
(see dbt-core #9607), so this is just implied as the baseline schema version. Downstream models will reference this table assource('kafka_src','t')
. -
When a new schema is available upstream for
kafka_src
, you'll need to explicitly create a modelt_v1
that runs theCREATE TABLE FROM SOURCE
statement under the hood to spin up a new (recklocked) version of the source with the new schema. Let's assume we override the existingtable
materialization for this:--t_v1.sql {{ config(materialized='table') }} FROM SOURCE kafka_src ("t")
Side note: I'm not convinced this should override the base
table
materialization, as it'd likely be helpful to explicitly use a custom materialization more tailored to source tables (i.e. read-only) in specific, and we might eventually support theCREATE TABLE AS SELECT
semantics, but...you can't define contracts on custom materializations (AFAIK!), so this might just need to be the stopgap. What this means in terms of (possible) loss of dbt compatibility is a separate question we should consider. -
You'll then need to enforce a contract for the new table version in
schema.yml
, but...t
isn't technically a model and can't be technically versioned, so you can't use it as the base model. Unless I'm missing something, and unless dbt introduces support for enforcing contracts on dbtsources
, there isn't an easy way to version the original table that is created when you typeCREATE SOURCE
.
An admittedly ugly but potentially workable solution with the current state of things would be to pull the original source table in as a view
model:
--kafka_t.sql
{{ config(materialized='view') }}
SELECT * FROM source('kafka_src','t')
- name: kafka_t
config:
materialized: view
contract: {enforced: true}
columns:
- name: customer_id
data_type: int
- name: country_name
data_type: varchar
You could then reference this view in downstream models using ref('kafka_t')
, and make it so that any new versions of the model just use a different materialization type:
--kafka_t_v2.sql
{{ config(materialized='table') }}
FROM SOURCE kafka_src ("t")
The models/schema.yml
file would look something like:
- name: kafka_t
# Pin the initial version as the latest.
latest_version: 1
config:
materialized: view
contract: {enforced: true}
columns:
- name: customer_id
data_type: int
- name: country_name
data_type: varchar
versions:
- v: 1
config:
alias: kafka_t
# Matches original contract definition -- nothing more needed.
- v: 2
config:
materialized: table
# New columns added upstream
columns:
- name: birth_date
data_type: date
- name: email_preferences
data_type: varchar
Any downstream models using ref('kafka_t')
would keep using the original schema version. Downstream consumers who want to incorporate the new columns into their models would change their dependencies to use v=2
:
--transform_1.sql
{{ config(materialized='view') }}
SELECT ...
FROM ref('kafka_t', v=2)
These changes could then be deployed using our current blue/green deployment workflow, providing that the source tables aren't co-located with the dependent objects.