There are primarily two ways of setting up the kafka connect environment and installing the Debezium connectors, either the manually installation or using the Debezium Docker images. At this document, the manual installation procedure is described.
Installation environment requirements:
-
PostgreSQL (v9.6.10)
-
Postgres Debezium connector (v0.9.2)
-
Kafka (v2.12-2.1.1) or Confluent platform (community v5.1.2-2.11)
-
CentOS Linux 7 (kernel: Linux 3.10.0-862.11.6.el7.x86_64)
ℹ️
|
Note that Java 8 or later is required to run the Debezium connectors. |
The installation procedure is performed in two parts:
-
The Logical decoding plugin installation, where the installation of the wal2json logical decoding output plugin at the postgresql database is performed.
-
The Debezium connector installation, where the Kafka connect environment setup and the Debezium connector installation are included.
Logical decoding is the process of extracting all persistent changes to a database’s tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database’s internal state.
As of PostgreSQL 9.4, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements. In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database. The output plugins transform the data from the write-ahead log’s internal representation into the format the consumer of a replication slot desires. Plugins are written in C, compiled, and installed on the machine which runs the PostgreSQL server, and they use a number of PostgreSQL specific APIs, as described by the PostgreSQL documentation.
Debezium’s PostgreSQL connector works with one of Debezium’s supported logical decoding plugin, protobuf or wal2json, to encode the changes in either Protobuf format or JSON format respectively.
|
The Debezium logical decoding plugins have only been installed and tested on Linux machines. For Windows and other OSes it may require different installation steps |
|
wal2json limitations
|
ℹ️
|
More information about the logical decoding and output plugins can be found at: |
At the current installation example, the wal2json output plugin for logical decoding is used. The wal2json output plugin produces a JSON object per transaction. All of the new/old tuples are available in the JSON object. The plugin compilation and installation is performed by executing the related commands extracted from the Debezium docker image file.
Before executing the commands, make sure that the user has the privileges to write the wal2json
library at the PostgreSQL lib
directory (at the test environment, the directory is: /usr/pgsql-9.6/lib/
). Also note that the installation process requires the PostgreSQL utility pg_config. Verify that the PATH
environment variable is set so as the utility can be found. If not, update the PATH
environment variable appropriately. For example at the test environment:
export PATH="$PATH:/usr/pgsql-9.6/bin"
$ git clone https://github.com/eulerto/wal2json -b master --single-branch \
&& cd wal2json \
&& git checkout d2b7fef021c46e0d429f2c1768de361069e58696 \
&& make && make install \
&& cd .. \
&& rm -rf wal2json
Cloning into 'wal2json'...
remote: Counting objects: 445, done.
remote: Total 445 (delta 0), reused 0 (delta 0), pack-reused 445
Receiving objects: 100% (445/445), 180.70 KiB | 0 bytes/s, done.
Resolving deltas: 100% (317/317), done.
Note: checking out 'd2b7fef021c46e0d429f2c1768de361069e58696'.
You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by performing another checkout.
If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -b with the checkout command again. Example:
git checkout -b new_branch_name
HEAD is now at d2b7fef... Improve style
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -fPIC -I. -I./ -I/usr/pgsql-9.6/include/server -I/usr/pgsql-9.6/include/internal -D_GNU_SOURCE -I/usr/include/libxml2 -I/usr/include -c -o wal2json.o wal2json.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -fPIC -L/usr/pgsql-9.6/lib -Wl,--as-needed -L/usr/lib64 -Wl,--as-needed -Wl,-rpath,'/usr/pgsql-9.6/lib',--enable-new-dtags -shared -o wal2json.so wal2json.o
/usr/bin/mkdir -p '/usr/pgsql-9.6/lib'
/usr/bin/install -c -m 755 wal2json.so '/usr/pgsql-9.6/lib/'
Once the wal2json plugin has been installed, the database server should be configured.
Add the following lines at the end of the postgresql.conf
PostgreSQL configuration file in order to include the plugin at the shared libraries and to adjust some WAL and steaming replication settings. The configuration is extracted from postgresql.conf.sample. You may need to modify it, if for example you have additionally installed shared_preload_libraries
.
############ REPLICATION ##############
# MODULES
shared_preload_libraries = 'wal2json' # load 'wal2json' plugin (the plugin name is set at makefiles)
# REPLICATION
wal_level = logical # use logical decoding with the write-ahead log
max_wal_senders = 4 # max number of separate processes for processing WAL changes
max_replication_slots = 4 # max number of replication slots to be created for streaming WAL changes
Debezium needs a PostgreSQL’s WAL to be kept during Debezium outages. If your WAL retention is too small and outages too long then Debezium will not be able to recover after restart as it will miss part of the data changes. The usual indicator is an error similar to this thrown during the startup: ERROR: requested WAL segment 000000010000000000000001 has already been removed
.
When this happens then it is necessary to re-execute the snapshot of the database. It is also recommend to set parameter wal_keep_segments = 0
. Please follow PostgreSQL offical documentation for fine-tuning of WAL retention.
💡
|
It is strongly recommend reading and understanding the official documentation regarding the mechanics and configuration of the PostgreSQL write-ahead log |
Replication can only be performed by a database user that has appropriate permissions and only for a configured number of hosts. In order to give a user replication permissions, define a PostgreSQL role that has at least the REPLICATION
and LOGIN
permissions. For example:
CREATE ROLE name REPLICATION LOGIN;
💡
|
Superusers have by default both of the above roles. |
Add the following lines at the end of the pg_hba.conf
PostgreSQL configuration file, so as to configure the client authentication for the database replication. The PostgreSQL server should allow replication to take place between the server machine and the host on which the Debezium PostgreSQL connector is running.
Note that the authentication refers to the database superuser postgres
. You may change this accordingly, if some other user with REPLICATION
and LOGIN
permissions has been created.
############ REPLICATION ##############
local replication postgres trust # allow replication for `postgres` locally
host replication postgres 127.0.0.1/32 trust # receive replication changes using `IPV4`
host replication postgres ::1/128 trust # receive replication changes using `IPV6`
💡
|
See the PostgreSQL documentation for more information on network masks. |
For the testing purposes, a database named test
with a table named test_table
are created with the following DDL commands:
CREATE DATABASE test;
CREATE TABLE test_table (
id char(10) NOT NULL,
code char(10),
PRIMARY KEY (id)
);
Test that the wal2json
is working properly by obtaining the test_table
changes using the pg_recvlogical PostgreSQL client application that controls PostgreSQL logical decoding streams.
Before starting make sure that you have login as the user with database replication permissions, as configured at a previous step. Otherwise, the slot creation and streaming fails with the following error message:
pg_recvlogical: could not connect to server: FATAL: no pg_hba.conf entry for replication connection from host "[local]", user "root", SSL off
At the test environment, the user with replication permission is the postgres
.
Also, make sure that the PATH
environment variable is set so as the pg_recvlogical
can be found. If not, update the PATH
environment variable appropriately. For example at the test environment:
export PATH="$PATH:/usr/pgsql-9.6/bin"
-
Create a slot named
test_slot
for the database namedtest
, using the logical output pluginwal2json
$ pg_recvlogical -d test --slot test_slot --create-slot -P wal2json
-
Begin streaming changes from the logical replication slot
test_slot
for the databasetest
$ pg_recvlogical -d test --slot test_slot --start -o pretty-print=1 -f -
-
Perform the basic DML operations at
test_table
to triggerINSERT
/UPDATE
/DELETE
change events
test=# INSERT INTO test_table (id, code) VALUES('id1', 'code1');
INSERT 0 1
test=# update test_table set code='code2' where id='id1';
UPDATE 1
test=# delete from test_table where id='id1';
DELETE 1
Upon the INSERT
, UPDATE
and DELETE
events, the wal2json
outputs the table changes as captured by pg_recvlogical
.
INSERT
event{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "test_table",
"columnnames": ["id", "code"],
"columntypes": ["character(10)", "character(10)"],
"columnvalues": ["id1 ", "code1 "]
}
]
}
UPDATE
event{
"change": [
{
"kind": "update",
"schema": "public",
"table": "test_table",
"columnnames": ["id", "code"],
"columntypes": ["character(10)", "character(10)"],
"columnvalues": ["id1 ", "code2 "],
"oldkeys": {
"keynames": ["id"],
"keytypes": ["character(10)"],
"keyvalues": ["id1 "]
}
}
]
}
DELETE
event{
"change": [
{
"kind": "delete",
"schema": "public",
"table": "test_table",
"oldkeys": {
"keynames": ["id"],
"keytypes": ["character(10)"],
"keyvalues": ["id1 "]
}
}
]
}
💡
|
Note that the REPLICA IDENTITY of the table |
When the test is finished, the slot test_slot
for the database test
can be removed by the following command:
$ pg_recvlogical -d test --slot test_slot --drop-slot
ℹ️
|
REPLICA IDENTITY, is a PostgreSQL specific table-level setting which determines the amount of information that is available to logical decoding in case of You can set the replica identity to ALTER TABLE test_table REPLICA IDENTITY FULL;
test=# \d+ test_table
Table "public.test_table"
Column | Type | Modifiers | Storage | Stats target | Description
-------+---------------+-----------+----------+--------------+------------
id | character(10) | not null | extended | |
code | character(10) | | extended | |
Indexes:
"test_table_pkey" PRIMARY KEY, btree (id)
Replica Identity: FULL Here is the output of Output for `UPDATE`
{
"change": [
{
"kind": "update",
"schema": "public",
"table": "test_table",
"columnnames": ["id", "code"],
"columntypes": ["character(10)", "character(10)"],
"columnvalues": ["id1 ", "code2 "],
"oldkeys": {
"keynames": ["id", "code"],
"keytypes": ["character(10)", "character(10)"],
"keyvalues": ["id1 ", "code1 "]
}
}
]
} |
The Kafka Connect environment comes with two flavors, as a kafka installation or as a a part of the Confluent platform installation. In any case, if you’ve already installed Zookeeper, Kafka, and Kafka Connect, then using one of Debezium’s connectors is easy. Simply download one or more connector plugin archives, extract their files into your Kafka Connect environment, and add the parent directory of the extracted plugin(s) to Kafka Connect’s plugin path.
Download the Kafka archive (version kafka_2.12-2.1.1 is used) and extract it into a directory (e.g. /opt
)
$ tar xzf kafka_2.12-2.1.1.tgz -C /opt
ℹ️
|
In the rest of the document, the Kafka installation directory is referred as <KAFKA_INST> .
|
Download the Confluent Platform (community version 5.1.2-2.11 is used) and extract it into a directory (e.g. /opt
) as described at manual Install using ZIP and TAR Archives
$ curl -O http://packages.confluent.io/archive/5.1/confluent-community-5.1.2-2.11.tar.gz
$ tar xzf confluent-community-5.1.2-2.11.tar.gz -C /opt
ℹ️
|
In the rest of the document, the Confluent installation directory is referred as <CONFLUENT_INST> .
|
Before starting the installation, the Kafka Connect worker’s plugin path should be set. The plugin path is a comma-separated list of paths to directories that contain Kafka Connect plugins. It is defined at the Kafka Connect configuration file via the plugin.path
parameter.
The configuration file has two flavors depending on the Kafka Connect execution mode, the standalone and the distributed. At the standalone mode, the connect-standalone.properties
file is used, whereas at the distributed, the connect-distributed.properties
file is consider. The files are located under the directories <KAFKA_INST>/config
and <CONFLUENT_INST>/etc/kafka
for the plain Kafka Connect and the Confluent Platform installations respectively.
In order to show both standalone and distributed execution modes, the plain Kafka Connect will be started at standalone mode whereas the Confluent Platform will be started at distributed mode (which is the default one).
ℹ️
|
More information about Kafka Connectors can be found at: |
-
Open the
<KAFKA_INST>/config/connect-standalone.properties
and set theplugin.path
parameter appropriately. -
Download the Postgres Connector and extract it under the Kafka Connect worker’s plugin path as defined previously.
-
Open the
<CONFLUENT_INST>/etc/kafka/connect-distributed.properties
and set theplugin.path
parameter appropriately (the default value of the parameter points to<CONFLUENT_INST>/share/java
). -
Download the Postgres Connector and extract it under the Kafka Connect worker’s plugin path as defined previously.
Connector configurations are key-value mappings used to set up connectors. For standalone mode, these are defined in a properties file and passed to the Connect process on the command line. In distributed mode, they will be included in the JSON payload sent over the REST API.
Generally, the Debezium configuration file includes parameters related to
-
the database connectivity (
database.hostname
,database.port
,database.user
,database.password
), -
the kafka message key and value format (
key.converter
,value.converter
), -
what data should be included at the kafka message (
key.converter.schemas.enable
,value.converter.schemas.enable
), -
the message structure (Event Flattening).
Here are some parameters with their values used in the current example, you can modify the values according to your needs:
-
name
:dbz-test-connector
, the logical name of the connector -
connector.class
:io.debezium.connector.postgresql.PostgresConnector
, the Debezium Postgresql connector class -
plugin.name
:wal2json
, the used logical decoding output plugin -
key.converter
:org.apache.kafka.connect.json.JsonConverter
, the appropriate converter to serialize kafka message key as JSON -
value.converter
:org.apache.kafka.connect.json.JsonConverter
, the appropriate converter to serialize kafka message value as JSON -
database.dbname
:test
, the name of the PostgreSQL database from which to stream the changes -
database.server.name
:DBTestServer
, the logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored
💡
|
At the current example, the following configuration for the Debezium Connector is used. Modify the parameter values, if needed, and save the configuration into a file (e.g. <KAFKA_INST>/config/dbz-test-connector.properties
).
name=dbz-test-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
plugin.name=wal2json
database.hostname=localhost
database.port=5432
database.user=postgres
database.password=password
database.dbname =test
database.server.name=DBTestServer
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
-
Start the zookeeper server
$ cd <KAFKA_INST>
$ bin/zookeeper-server-start.sh config/zookeeper.properties
-
Start the Kafka server
$ bin/kafka-server-start.sh config/server.properties
-
Deploy the Debezium connector
$ bin/connect-standalone.sh config/connect-standalone.properties config/dbz-test-connector.properties
-
Check the Debezium connector status
$ curl -s localhost:8083/connectors/dbz-test-connector/status | jq
{
"name": "dbz-test-connector",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "127.0.0.1:8083"
}
],
"type": "source"
}
ℹ️
|
More information about Kafka Rest API can be found at the official documentation. |
In case of using the Confluent Platform, the configuration parameters of the Debezium Connector is the same as the deployment at the plain Kafka Connect. The only difference is that the parameters are represented in JSON format. Modify the parameter values, if needed, and save the configuration into a file (e.g. <CONFLUENT_INST>/etc/kafka/dbz-test-connector.json
).
{
"name": "dbz-test-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "wal2json",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname" : "test",
"database.server.name": "DBTestServer",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable": "false"
}
}
Use the Confluent CLI to start the Confluent platform services and load the Debezium Connector.
-
Start Confluent Platform services
$ cd <CONFLUENT_INST>
$ bin/confluent start
-
Deploy the Debezium Connector
$ bin/confluent load dbz-test-connector -d <CONFLUENT_INST>/etc/kafka/dbz-test-connector.json
{
"name": "dbz-test-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "wal2json",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "test",
"database.server.name": "DBTestServer",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"name": "dbz-test-connector"
},
"tasks": [],
"type": null
}
-
Check the Debezium connector status
$ bin/confluent status dbz-test-connector
{
"name": "dbz-test-connector",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "127.0.0.1:8083"
}
],
"type": "source"
}
ℹ️
|
More information about Confluent CLI can be found at the official documentation. |
The Debezium PostgreSQL connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. The name of the Kafka topics takes by default the form serverName.schemaName.tableName, where serverName is the logical name of the connector as specified with the database.server.name
configuration property, schemaName is the name of the database schema where the operation occurred, and tableName is the name of the database table on which the operation occurred. In our case, the name of the created kafka topic is DBTestServer.public.test_table
.
Most PostgreSQL servers are configured to not retain the complete history of the database in the WAL segments, so the PostgreSQL connector would be unable to see the entire history of the database by simply reading the WAL. So, by default the connector will upon first startup perform an initial consistent snapshot of the database.
💡
|
For the needs of the tests, it is recommended to use the kafkacat, a command line utility that helps to test and debug Apache Kafka deployments. It can be used to produce, consume, and list topic and partition information for Kafka. You can download the latest version and installed it by following the instructions described at the documentation. In the rest of the document, the kafkacat installation directory is referred as |
In order to check if the Debezium connector works as expected the following tests can be performed:
-
Check the topic(s) creation for the database table(s)
Verify the creation of kafka topics for the tables that the connector is applied for (test_table
in our example)
$ <KAFKACAT_INST>/kafkacat -b localhost:9092 -L | grep DBTestServer
Alternatively the kafka-topics Kafka command line tool can be used for the plain Kafka Connect and Confluent Platform deployments as follows:
kafka-topics
command$ <KAFKA_INST>/bin/kafka-topics.sh --list --zookeeper localhost:2181 | grep DBTestServer
kafka-topics
command$ <CONFLUENT_INST>/bin/kafka-topics --list --zookeeper localhost:2181 | grep DBTestServer
The output of the above command should include a topic named DBTestServer.public.test_table
.
-
Check the initial topic(s) content
Check the Kafka topic messages (the DBTestServer.public.test_table
topic in our case) of the respective table, an initial snapshot of the database should be contained (the output is formatted in order to be more readable)
$ <KAFKACAT_INST>/kafkacat -b localhost:9092 -t DBTestServer.public.test_table -C -o beginning -f 'Key: %k\nValue: %s\n'
Key: {"id":"id1 "}
Value: {
"before":null,
"after":{"id":"id1 ","code":"code2 "},
"source":{
"version":"0.9.2",
"name":"DBTestServer",
"db":"test",
"ts_usec":1537191190816000,
"txId":934261,
"lsn":3323094832,
"schema":"public",
"table":"test_table",
"snapshot":true,
"last_snapshot_record":false},
"op":"r",
"ts_ms":1537191190817
}
% Reached end of topic DBTestServer.public.test_table [0] at offset 1
Alternatively the kafka-console-consumer Kafka command line tool can be used, for the plain Kafka Connect and Confluent Platform deployments as follows:
kafka-console-consumer
command$ <KAFKA_INST>/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic DBTestServer.public.test_table --from-beginning --property print.key=true
kafka-console-consumer
command$ <CONFLUENT_INST>/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic DBTestServer.public.test_table --from-beginning --property print.key=true
Indeed, the connect took an initial database snapshot (the test_table
contains only one record)
-
Monitor the kafka messages produced on table(s) changes
Monitor the messages added at the kafka topic when the respective table changes (e.g. on insert/update and deletion record)
$ <KAFKACAT_INST>/kafkacat -b localhost:9092 -t DBTestServer.public.test_table -C -o beginning -f 'Key: %k\nValue: %s\n'
Alternatively the kafka-console-consumer
Kafka command line tool can be used as described previously.
Here are the DML operations at the test_table
and the respective kafka messages added at DBTestServer.public.test_table
topic (the messages are formatted in order to be more readable)
test=# INSERT INTO test_table (id, code) VALUES('id2', 'code2');
Key: {"id":"id2 "}
Value: {
"before":null,
"after":{"id":"id2 ","code":"code2 "},
"source":{
"version":"0.9.2",
"name":"DBTestServer",
"db":"test",
"ts_usec":1537262994443180000,
"txId":934262,
"lsn":3323107556,
"schema":"public",
"table":"test_table",
"snapshot":true,
"last_snapshot_record":true},
"op":"c",
"ts_ms":1537262994604
}
test=# update test_table set code='code3' where id='id2';
Key: {"id":"id2 "}
Value: {
"before":{"id":"id2 ","code":null},
"after":{"id":"id2 ","code":"code3 "},
"source":{
"version":"0.9.2",
"name":"DBTestServer",
"db":"test",
"ts_usec":1537263061440799000,
"txId":934263,
"lsn":3323108190,
"schema":"public",
"table":"test_table",
"snapshot":true,
"last_snapshot_record":true},
"op":"u",
"ts_ms":1537263061474
}
test=# delete from test_table where id='id2';
Key: {"id":"id2 "}
Value: {
"before":{"id":"id2 ","code":null},
"after":null,
"source":{
"version":"0.9.2",
"name":"DBTestServer",
"db":"test",
"ts_usec":1537263155358693000,
"txId":934264,
"lsn":3323108208,
"schema":"public",
"table":"test_table",
"snapshot":true,
"last_snapshot_record":true},
"op":"d",
"ts_ms":1537263155374}
Key: {"id":"id2 "}
Value:
Debezium’s PostgreSQL connector always follows the delete event with a special tombstone event that has the same key but null value in order to remove all messages with same key during kafka log compaction. This behavior can be controled via the connector parameter tombstones.on.delete.
Written by John Psoroulas, 2018-2019.