Last active
September 28, 2022 16:01
-
-
Save Schm1tz1/4225f395a716b10c477c09cf94d8f103 to your computer and use it in GitHub Desktop.
Debezium Example with CFK
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
apiVersion: platform.confluent.io/v1beta1 | |
kind: Connector | |
metadata: | |
name: debezium | |
namespace: confluent | |
spec: | |
class: io.debezium.connector.postgresql.PostgresConnector | |
taskMax: 1 | |
connectClusterRef: | |
name: connect | |
configs: | |
database.hostname: <DB-Host> | |
database.port: "5432" | |
database.user: <DB-User> | |
database.password: <DB-Secret> | |
database.dbname: <DB-Name> | |
database.server.name: <DB-Server-Name> | |
plugin.name: pgoutput | |
snapshot.mode: always # for testing, default: initial | |
# key.converter: io.confluent.connect.avro.AvroConverter | |
key.converter.schema.registry.url: http://schemaregistry.confluent.svc.cluster.local:8081 | |
key.converter: org.apache.kafka.connect.storage.StringConverter | |
value.converter: io.confluent.connect.avro.AvroConverter | |
value.converter.schema.registry.url: http://schemaregistry.confluent.svc.cluster.local:8081 | |
#schema.include.list: "public,northwind" | |
table.include.list: "northwind.orders,northwind.customers" | |
# auto.register.schemas: false | |
# use.latest.version: true | |
# latest.compatibility.strict: true | |
# typical extraction example (https://debezium.io/documentation/reference/stable/transformations/event-flattening.html) | |
transforms: Unwrapper,ValueToKey,Reroute,ExtractName | |
transforms.Unwrapper.type: io.debezium.transforms.ExtractNewRecordState | |
transforms.Unwrapper.drop.tombstones: 'false' | |
transforms.Unwrapper.delete.handling.mode: rewrite | |
transforms.unwrap.add.fields: op,table,lsn,source.ts_ms | |
# only works for flat messages - nested structures will come with KIP-821 | |
transforms.ValueToKey.type: org.apache.kafka.connect.transforms.ValueToKey | |
transforms.ValueToKey.fields: customer_id | |
# finally only extract the contents of the field | |
transforms.ExtractName.type: org.apache.kafka.connect.transforms.ExtractField$Key | |
transforms.ExtractName.field: customer_id | |
# reroute based on regex | |
transforms.Reroute.type: io.debezium.transforms.ByLogicalTableRouter | |
transforms.Reroute.topic.regex: schmitzi.northwind.(.*) | |
transforms.Reroute.topic.replacement: northwind_$1 | |
# avoid writing the unique key fields based on source tables (see https://debezium.io/documentation/reference/stable/transformations/topic-routing.html#_ensure_unique_key) | |
transforms.Reroute.key.enforce.uniqueness: 'false' | |
# topic name suffixes | |
# transforms: addTopicSuffix | |
# transforms.addTopicSuffix.type: org.apache.kafka.connect.transforms.RegexRouter | |
# transforms.addTopicSuffix.regex: "(.*)" | |
# transforms.addTopicSuffix.replacement: "$1-raw" | |
# route update events to update topic | |
# transforms: route | |
# transforms.route.type: io.debezium.transforms.ContentBasedRouter | |
# transforms.route.language: jsr223.groovy | |
# transforms.route.topic.expression: "value.op == 'u' ? 'updates' : null" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment