Skip to content

Instantly share code, notes, and snippets.

@morsapaes
Last active May 7, 2024 15:56
Show Gist options
  • Save morsapaes/1855044117335d238fcbfeaa7fa268ff to your computer and use it in GitHub Desktop.
Save morsapaes/1855044117335d238fcbfeaa7fa268ff to your computer and use it in GitHub Desktop.

(Source) Table schema evolution via dbt model versioning

In the context of Materialize #26881, walking through my understanding of how the proposed design would work to handle schema evolution using dbt-materialize.

OOTB

  1. With the current proposal, we will automatically create a table (or multiple tables, for multi-output sources) the first time someone types CREATE SOURCE:

    -- This creates a source `kafka_src` + a table `t`, which is what you will
    -- query as a user.
    CREATE SOURCE kafka_src
    FROM KAFKA CONNECTION kafka_conn (TOPIC 't')

    This will be the v1 of the source table, for dbt purposes — this v1 won't actually exist as a model in the dbt context, much like today (see dbt-core #6185). You'll pull it into the dbt context (and make it ref()-able) by declaring it as a dbt source in a sources.yml file:

    sources:
      - name: kafka_src
        schema: "{{ target.schema }}"
        tables:
          - name: t

    It isn't possible to define contracts on dbt sources (see dbt-core #9607), so this is just implied as the baseline schema version. Downstream models will reference this table as source('kafka_src','t').

  2. When a new schema is available upstream for kafka_src, you'll need to explicitly create a model t_v1 that runs the CREATE TABLE FROM SOURCE statement under the hood to spin up a new (recklocked) version of the source with the new schema. Let's assume we override the existing table materialization for this:

    --t_v1.sql
    {{ config(materialized='table') }}
    
    FROM SOURCE kafka_src ("t")

    Side note: I'm not convinced this should override the base table materialization, as it'd likely be helpful to explicitly use a custom materialization more tailored to source tables (i.e. read-only) in specific, and we might eventually support the CREATE TABLE AS SELECT semantics, but...you can't define contracts on custom materializations (AFAIK!), so this might just need to be the stopgap. What this means in terms of (possible) loss of dbt compatibility is a separate question we should consider.

  3. You'll then need to enforce a contract for the new table version in schema.yml, but...t isn't technically a model and can't be technically versioned, so you can't use it as the base model. Unless I'm missing something, and unless dbt introduces support for enforcing contracts on dbt sources, there isn't an easy way to version the original table that is created when you type CREATE SOURCE.

What now?

An admittedly ugly but potentially workable solution with the current state of things would be to pull the original source table in as a view model:

--kafka_t.sql
{{ config(materialized='view') }}

SELECT * FROM source('kafka_src','t')
 - name: kafka_t
     config:
       materialized: view
       contract: {enforced: true}
     columns:
       - name: customer_id
         data_type: int
       - name: country_name
         data_type: varchar

You could then reference this view in downstream models using ref('kafka_t'), and make it so that any new versions of the model just use a different materialization type:

   --kafka_t_v2.sql
   {{ config(materialized='table') }}

   FROM SOURCE kafka_src ("t")

The models/schema.yml file would look something like:

 - name: kafka_t
    # Pin the initial version as the latest.
    latest_version: 1
    config:
      materialized: view
      contract: {enforced: true}
    columns:
      - name: customer_id
        data_type: int
      - name: country_name
        data_type: varchar
   
   versions:
      - v: 1
         config:
           alias: kafka_t
           # Matches original contract definition -- nothing more needed.
      
      - v: 2
         config:
           materialized: table
         # New columns added upstream
         columns:
           - name: birth_date
             data_type: date
           - name: email_preferences
             data_type: varchar

Any downstream models using ref('kafka_t') would keep using the original schema version. Downstream consumers who want to incorporate the new columns into their models would change their dependencies to use v=2:

--transform_1.sql
{{ config(materialized='view') }}
SELECT ...
FROM ref('kafka_t', v=2)

These changes could then be deployed using our current blue/green deployment workflow, providing that the source tables aren't co-located with the dependent objects.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment