Skip to content

Instantly share code, notes, and snippets.

View morsapaes's full-sized avatar
👹

Marta Paes morsapaes

👹
View GitHub Profile

(Source) Table schema evolution via dbt model versioning

In the context of Materialize #26881, walking through my understanding of how the proposed design would work to handle schema evolution using dbt-materialize.

OOTB

  1. With the current proposal, we will automatically create a table (or multiple tables, for multi-output sources) the first time someone types CREATE SOURCE:

    -- This creates a source `kafka_src` + a table `t`, which is what you will

SQL loops in dbt

One handy (and scary 👻) thing that Jinja adds on top of SQL is the ability to run for loops. Here's a rough example that shows how to simplify the statement Frank provided in models__staging__mqis_raw_fiscal_area_shift.sql using loops:

  1. Create a .csv file with the sid_id to time_zone mapping (dim_plant_timezones.csv) and place it under a /seeds directory. This mapping can be directly embedded into the model, too, but the seed approach guarantees that the file can be checked into version control and possibly reused across models:
sid_id,time_zone
22,america/denver
23,america/chicago

Evolving the dbt-materialize adapter

Tracking issue: #10600

Some things to consider:

Although sources and sinks are inverse concepts, sources have a one-to-many relationship with downstream relations, while sinks have a one-to-one relationship with upstream relations. Relations have a zero-to-many relationship with downstream sinks, though, which gets in the way of implementing them as inverse dbt concepts (e.g. using pre- and post-hooks).

Something else to consider is that source and sink configuration might have different ownership than model development in the wild (e.g. data engineers vs. analytics engineers), so it'd be preferable not to tightly couple them.

SELECT *
FROM twitter_tweets_enriched
WHERE username IN (SELECT username FROM users_not_there)
ORDER BY created_at DESC;
CREATE MATERIALIZED VIEW tweets_hourly AS
SELECT
date_bin(interval '1 hours', created_at, '2022-03-22') AS time_bucket,
COUNT(tweet_id) AS total_tweets
FROM twitter_tweets
GROUP BY 1;
CREATE MATERIALIZED VIEW agg_tweets AS
SELECT COUNT(tweet) AS total_tweets,
username
FROM twitter_tweets_enriched
GROUP BY username;
CREATE MATERIALIZED VIEW agg_users AS
SELECT COUNT(twitter_id) AS total_tweets
FROM twitter_tweets
GROUP BY twitter_id;
CREATE VIEW twitter_tweets_enriched AS
SELECT tweet_text AS tweet,
username,
CASE WHEN tweet_type = 'quoted' THEN 'quoted retweet'
WHEN tweet_type = 'replied to' THEN 'tweet reply'
ELSE 'tweet'
END AS tweet_type,
created_at
FROM twitter_tweets tt
--This is a streaming join!
CREATE MATERIALIZED VIEW twitter_tweets AS
SELECT (data->>'id')::bigint AS tweet_id,
(data->'referenced_tweets'->0->>'type')::string AS tweet_type,
(data->>'text')::string AS tweet_text,
(data->'referenced_tweets'->0->>'id')::string AS tweet_id_rr,
(data->>'author_id')::bigint AS user_id,
(data->'geo'->>'place_id')::string AS place_id,
(data->>'created_at')::timestamp AS created_at
FROM (SELECT CONVERT_FROM(data,'utf8')::jsonb AS data FROM rp_twitter_tweets);
CREATE SOURCE rp_twitter_tweets
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'dc_tweets'
FORMAT BYTES;
--What are the top10 games being played?
SELECT game_name,
cnt_streams,
agg_viewer_cnt
FROM mv_agg_stream_game
ORDER BY agg_viewer_cnt
DESC LIMIT 10;
--Is anyone playing DOOM?
SELECT game_name,