Skip to content

Instantly share code, notes, and snippets.

@rockwotj
Created January 29, 2025 22:19
Show Gist options
  • Save rockwotj/b08cba853d3efe36b0539505e9410ef9 to your computer and use it in GitHub Desktop.
Save rockwotj/b08cba853d3efe36b0539505e9410ef9 to your computer and use it in GitHub Desktop.
input:
postgres_cdc:
dsn: "${POSTGRES_DSN}"
stream_snapshot: true
schema: public
tables:
- demo_table
- other_table
slot_name: rp_connect_repl_slot
snapshot_batch_size: 1024
checkpoint_limit: 1024
batching:
count: 100000
period: 10s
output:
kafka_franz:
seed_brokers: ["${REDPANDA_BROKER}"]
topic: 'cdc.${!@table}'
compression: zstd
metadata:
include_prefixes:
- operation
http:
enabled: false
input:
generate:
interval: '1s'
mapping: |
root = {
"user_id": uuid_v4(),
"ts": now(),
"data": {
"author": fake("name"),
"title": fake("sentence"),
"body": fake("paragraph"),
},
"foo": fake("sentence"),
}
pipeline:
processors:
- log:
message: 'writing: ${!this}'
output:
broker:
outputs:
- sql_insert:
driver: postgres
dsn: "${POSTGRES_DSN}"
init_statement: |
CREATE TABLE IF NOT EXISTS demo_table (
user_id uuid NOT NULL,
entity_id serial NOT NULL,
data jsonb,
ts timestamp,
primary key (user_id, entity_id)
);
table: demo_table
columns: [user_id, data, ts, foo]
args_mapping: root = [this.user_id, this.data.format_json(), this.ts, this.foo]
- sql_insert:
driver: postgres
dsn: "${POSTGRES_DSN}"
init_statement: |
CREATE TABLE IF NOT EXISTS other_table (
user_id uuid NOT NULL,
foo text,
ts timestamp,
primary key (user_id)
);
table: other_table
columns: [user_id, foo, ts]
args_mapping: root = [this.user_id, this.foo, this.ts]
http:
enabled: false
input:
kafka_franz:
seed_brokers: ['${REDPANDA_BROKER}']
topics: ['cdc\..*']
regexp_topics: true
consumer_group: 'rp2snow_group'
batching:
count: 100000
period: 10s
pipeline:
processors:
- mutation: |
root._operation = @operation
root._offset = @kafka_offset
meta table = @kafka_topic.trim_prefix("cdc.")
output:
broker:
pattern: fan_out_sequential
outputs:
# First land the initial data into the staging table with the raw
# data as well as the _operation metadata. We'll use the _operation
# metadata to apply the delta operations from the CDC stream into
# our table in Snowflake.
- label: 'snowpipe_streaming'
snowflake_streaming:
account: '${SNOWFLAKE_ACCOUNT_ID}'
user: '${SNOWFLAKE_USER}'
role: '${SNOWFLAKE_ROLE}'
database: cdc_db
schema: public
table: ${!@table}_staging
private_key_file: './rsa_key.p8'
schema_evolution:
enabled: true
# Processors to run during schema evolution
processors:
- mutation: |
root.type = match this.value.type() {
this == "string" => "STRING"
this == "bytes" => "BINARY"
this == "number" => "DOUBLE"
this == "bool" => "BOOLEAN"
this == "timestamp" => "TIMESTAMP"
_ => "VARIANT"
}
- branch:
# if the column doesn't start with _
request_map: |
root = if this.name.has_prefix("_") {
deleted()
} else {
this
}
# Then run some SQL to evolve the final table schema
processors:
- sql_raw:
driver: snowflake
dsn: ${SNOWFLAKE_DSN}
unsafe_dynamic_query: true
queries:
- query: |
CREATE TABLE IF NOT EXISTS ${!@table} (
${!this.name} ${!this.type}
)
exec_only: true
- query: |
ALTER TABLE IF EXISTS ${!@table}
ADD COLUMN IF NOT EXISTS ${!this.name} ${!this.type}
exec_only: true
- mapping: |
root = this.type
max_in_flight: 4
- processors:
# Collapse our batch into a single array
- archive:
format: json_array
# Compute all the keys that are present in our batch. This will
# tell us if new columns have been added or not
- mapping: |
let all_columns = this.fold([], item -> item.tally.concat(item.value.keys()).unique())
root.columns = $all_columns.filter(col -> !col.has_prefix("_"))
# First we need to get the primary key columns from postgres table so
# we know how to match rows
- branch:
processors:
# Cache it because this won't change
- cached:
cache: inmem
key: ${!@table}_pk
processors:
- sql_raw:
driver: postgres
dsn: "${POSTGRES_DSN}"
unsafe_dynamic_query: true
query: |
SELECT
kcu.column_name as column_name
FROM
information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE
tc.constraint_type = 'PRIMARY KEY'
AND tc.table_name = '${!@table}'
AND tc.table_schema = 'public'
ORDER BY
kcu.ordinal_position;
result_map: |
root.pk = this.map_each(row -> row.column_name)
- mapping: |
root.statement = """
merge into TABLE_NAME using (
select *
from STAGING_TABLE_NAME
qualify row_number() over (partition by PK order by _offset desc) = 1
) STAGING_TABLE_NAME
on (STAGING_TABLE_PK) = (TABLE_PK)
when matched and STAGING_TABLE_NAME._operation = 'delete' then delete
when matched and STAGING_TABLE_NAME._operation = 'update' then update set UPDATE_CLAUSE
when not matched then insert (ALL_COLUMNS) values (ALL_STAGING_TABLE_COLUMNS)
""".replace_all_many([
"STAGING_TABLE_PK", this.pk.map_each(col -> @table + "_staging." + col).join(", "),
"TABLE_PK", this.pk.map_each(col -> @table + "." + col).join(", "),
"PK", this.pk.join(", "),
"ALL_COLUMNS", this.columns.join(", "),
"ALL_STAGING_TABLE_COLUMNS", this.columns.map_each(col -> @table + "_staging." + col).join(", "),
"UPDATE_CLAUSE", this.columns.map_each(col -> col + " = " + @table + "_staging." + col).join(", "),
"STAGING_TABLE_NAME", @table + "_staging",
"TABLE_NAME", @table,
])
# Transactionally execute the merge and then truncate our table
# for the next time.
label: 'snowflake_merge'
sql_raw:
driver: snowflake
dsn: "${SNOWFLAKE_DSN}"
unsafe_dynamic_query: true
queries:
- query: '${!this.statement}'
- query: 'TRUNCATE ${!@table}_staging'
cache_resources:
- label: inmem
memory:
# disable TTL
compaction_interval: ''
http:
enabled: false
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment