Created
January 29, 2025 22:19
-
-
Save rockwotj/b08cba853d3efe36b0539505e9410ef9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
input: | |
postgres_cdc: | |
dsn: "${POSTGRES_DSN}" | |
stream_snapshot: true | |
schema: public | |
tables: | |
- demo_table | |
- other_table | |
slot_name: rp_connect_repl_slot | |
snapshot_batch_size: 1024 | |
checkpoint_limit: 1024 | |
batching: | |
count: 100000 | |
period: 10s | |
output: | |
kafka_franz: | |
seed_brokers: ["${REDPANDA_BROKER}"] | |
topic: 'cdc.${!@table}' | |
compression: zstd | |
metadata: | |
include_prefixes: | |
- operation | |
http: | |
enabled: false |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
input: | |
generate: | |
interval: '1s' | |
mapping: | | |
root = { | |
"user_id": uuid_v4(), | |
"ts": now(), | |
"data": { | |
"author": fake("name"), | |
"title": fake("sentence"), | |
"body": fake("paragraph"), | |
}, | |
"foo": fake("sentence"), | |
} | |
pipeline: | |
processors: | |
- log: | |
message: 'writing: ${!this}' | |
output: | |
broker: | |
outputs: | |
- sql_insert: | |
driver: postgres | |
dsn: "${POSTGRES_DSN}" | |
init_statement: | | |
CREATE TABLE IF NOT EXISTS demo_table ( | |
user_id uuid NOT NULL, | |
entity_id serial NOT NULL, | |
data jsonb, | |
ts timestamp, | |
primary key (user_id, entity_id) | |
); | |
table: demo_table | |
columns: [user_id, data, ts, foo] | |
args_mapping: root = [this.user_id, this.data.format_json(), this.ts, this.foo] | |
- sql_insert: | |
driver: postgres | |
dsn: "${POSTGRES_DSN}" | |
init_statement: | | |
CREATE TABLE IF NOT EXISTS other_table ( | |
user_id uuid NOT NULL, | |
foo text, | |
ts timestamp, | |
primary key (user_id) | |
); | |
table: other_table | |
columns: [user_id, foo, ts] | |
args_mapping: root = [this.user_id, this.foo, this.ts] | |
http: | |
enabled: false |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
input: | |
kafka_franz: | |
seed_brokers: ['${REDPANDA_BROKER}'] | |
topics: ['cdc\..*'] | |
regexp_topics: true | |
consumer_group: 'rp2snow_group' | |
batching: | |
count: 100000 | |
period: 10s | |
pipeline: | |
processors: | |
- mutation: | | |
root._operation = @operation | |
root._offset = @kafka_offset | |
meta table = @kafka_topic.trim_prefix("cdc.") | |
output: | |
broker: | |
pattern: fan_out_sequential | |
outputs: | |
# First land the initial data into the staging table with the raw | |
# data as well as the _operation metadata. We'll use the _operation | |
# metadata to apply the delta operations from the CDC stream into | |
# our table in Snowflake. | |
- label: 'snowpipe_streaming' | |
snowflake_streaming: | |
account: '${SNOWFLAKE_ACCOUNT_ID}' | |
user: '${SNOWFLAKE_USER}' | |
role: '${SNOWFLAKE_ROLE}' | |
database: cdc_db | |
schema: public | |
table: ${!@table}_staging | |
private_key_file: './rsa_key.p8' | |
schema_evolution: | |
enabled: true | |
# Processors to run during schema evolution | |
processors: | |
- mutation: | | |
root.type = match this.value.type() { | |
this == "string" => "STRING" | |
this == "bytes" => "BINARY" | |
this == "number" => "DOUBLE" | |
this == "bool" => "BOOLEAN" | |
this == "timestamp" => "TIMESTAMP" | |
_ => "VARIANT" | |
} | |
- branch: | |
# if the column doesn't start with _ | |
request_map: | | |
root = if this.name.has_prefix("_") { | |
deleted() | |
} else { | |
this | |
} | |
# Then run some SQL to evolve the final table schema | |
processors: | |
- sql_raw: | |
driver: snowflake | |
dsn: ${SNOWFLAKE_DSN} | |
unsafe_dynamic_query: true | |
queries: | |
- query: | | |
CREATE TABLE IF NOT EXISTS ${!@table} ( | |
${!this.name} ${!this.type} | |
) | |
exec_only: true | |
- query: | | |
ALTER TABLE IF EXISTS ${!@table} | |
ADD COLUMN IF NOT EXISTS ${!this.name} ${!this.type} | |
exec_only: true | |
- mapping: | | |
root = this.type | |
max_in_flight: 4 | |
- processors: | |
# Collapse our batch into a single array | |
- archive: | |
format: json_array | |
# Compute all the keys that are present in our batch. This will | |
# tell us if new columns have been added or not | |
- mapping: | | |
let all_columns = this.fold([], item -> item.tally.concat(item.value.keys()).unique()) | |
root.columns = $all_columns.filter(col -> !col.has_prefix("_")) | |
# First we need to get the primary key columns from postgres table so | |
# we know how to match rows | |
- branch: | |
processors: | |
# Cache it because this won't change | |
- cached: | |
cache: inmem | |
key: ${!@table}_pk | |
processors: | |
- sql_raw: | |
driver: postgres | |
dsn: "${POSTGRES_DSN}" | |
unsafe_dynamic_query: true | |
query: | | |
SELECT | |
kcu.column_name as column_name | |
FROM | |
information_schema.table_constraints tc | |
JOIN information_schema.key_column_usage kcu | |
ON tc.constraint_name = kcu.constraint_name | |
AND tc.table_schema = kcu.table_schema | |
WHERE | |
tc.constraint_type = 'PRIMARY KEY' | |
AND tc.table_name = '${!@table}' | |
AND tc.table_schema = 'public' | |
ORDER BY | |
kcu.ordinal_position; | |
result_map: | | |
root.pk = this.map_each(row -> row.column_name) | |
- mapping: | | |
root.statement = """ | |
merge into TABLE_NAME using ( | |
select * | |
from STAGING_TABLE_NAME | |
qualify row_number() over (partition by PK order by _offset desc) = 1 | |
) STAGING_TABLE_NAME | |
on (STAGING_TABLE_PK) = (TABLE_PK) | |
when matched and STAGING_TABLE_NAME._operation = 'delete' then delete | |
when matched and STAGING_TABLE_NAME._operation = 'update' then update set UPDATE_CLAUSE | |
when not matched then insert (ALL_COLUMNS) values (ALL_STAGING_TABLE_COLUMNS) | |
""".replace_all_many([ | |
"STAGING_TABLE_PK", this.pk.map_each(col -> @table + "_staging." + col).join(", "), | |
"TABLE_PK", this.pk.map_each(col -> @table + "." + col).join(", "), | |
"PK", this.pk.join(", "), | |
"ALL_COLUMNS", this.columns.join(", "), | |
"ALL_STAGING_TABLE_COLUMNS", this.columns.map_each(col -> @table + "_staging." + col).join(", "), | |
"UPDATE_CLAUSE", this.columns.map_each(col -> col + " = " + @table + "_staging." + col).join(", "), | |
"STAGING_TABLE_NAME", @table + "_staging", | |
"TABLE_NAME", @table, | |
]) | |
# Transactionally execute the merge and then truncate our table | |
# for the next time. | |
label: 'snowflake_merge' | |
sql_raw: | |
driver: snowflake | |
dsn: "${SNOWFLAKE_DSN}" | |
unsafe_dynamic_query: true | |
queries: | |
- query: '${!this.statement}' | |
- query: 'TRUNCATE ${!@table}_staging' | |
cache_resources: | |
- label: inmem | |
memory: | |
# disable TTL | |
compaction_interval: '' | |
http: | |
enabled: false |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment