Skip to content

Instantly share code, notes, and snippets.

@Schm1tz1
Last active September 28, 2022 16:01
Show Gist options
  • Save Schm1tz1/4225f395a716b10c477c09cf94d8f103 to your computer and use it in GitHub Desktop.
Save Schm1tz1/4225f395a716b10c477c09cf94d8f103 to your computer and use it in GitHub Desktop.
Debezium Example with CFK
---
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
name: debezium
namespace: confluent
spec:
class: io.debezium.connector.postgresql.PostgresConnector
taskMax: 1
connectClusterRef:
name: connect
configs:
database.hostname: <DB-Host>
database.port: "5432"
database.user: <DB-User>
database.password: <DB-Secret>
database.dbname: <DB-Name>
database.server.name: <DB-Server-Name>
plugin.name: pgoutput
snapshot.mode: always # for testing, default: initial
# key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://schemaregistry.confluent.svc.cluster.local:8081
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://schemaregistry.confluent.svc.cluster.local:8081
#schema.include.list: "public,northwind"
table.include.list: "northwind.orders,northwind.customers"
# auto.register.schemas: false
# use.latest.version: true
# latest.compatibility.strict: true
# typical extraction example (https://debezium.io/documentation/reference/stable/transformations/event-flattening.html)
transforms: Unwrapper,ValueToKey,Reroute,ExtractName
transforms.Unwrapper.type: io.debezium.transforms.ExtractNewRecordState
transforms.Unwrapper.drop.tombstones: 'false'
transforms.Unwrapper.delete.handling.mode: rewrite
transforms.unwrap.add.fields: op,table,lsn,source.ts_ms
# only works for flat messages - nested structures will come with KIP-821
transforms.ValueToKey.type: org.apache.kafka.connect.transforms.ValueToKey
transforms.ValueToKey.fields: customer_id
# finally only extract the contents of the field
transforms.ExtractName.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractName.field: customer_id
# reroute based on regex
transforms.Reroute.type: io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex: schmitzi.northwind.(.*)
transforms.Reroute.topic.replacement: northwind_$1
# avoid writing the unique key fields based on source tables (see https://debezium.io/documentation/reference/stable/transformations/topic-routing.html#_ensure_unique_key)
transforms.Reroute.key.enforce.uniqueness: 'false'
# topic name suffixes
# transforms: addTopicSuffix
# transforms.addTopicSuffix.type: org.apache.kafka.connect.transforms.RegexRouter
# transforms.addTopicSuffix.regex: "(.*)"
# transforms.addTopicSuffix.replacement: "$1-raw"
# route update events to update topic
# transforms: route
# transforms.route.type: io.debezium.transforms.ContentBasedRouter
# transforms.route.language: jsr223.groovy
# transforms.route.topic.expression: "value.op == 'u' ? 'updates' : null"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment