┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 10:51:31 ~ master
└─ λ docker-compose up -d
Creating network "postgres_debezium_cdc_default" with the default driver
Pulling postgres (debezium/postgres:13)...
13: Pulling from debezium/postgres
fa0650a893c2: Pull complete
3099e59785c8: Pull complete
yay images
┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 10:54:58 ~ master
└─ λ docker container ls
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0d20a8fef3c4 debezium/connect:1.4 "/docker-entrypoint.…" 33 seconds ago Up 32 seconds 8778/tcp, 9092/tcp, 0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 9779/tcp postgres_debezium_cdc_debezium_1
ffbe59b3d11b confluentinc/cp-schema-registry:5.5.3 "/etc/confluent/dock…" 33 seconds ago Up 32 seconds 0.0.0.0:8081->8081/tcp, :::8081->8081/tcp postgres_debezium_cdc_schema-registry_1
b9034b4258d5 confluentinc/cp-enterprise-kafka:5.5.3 "/etc/confluent/dock…" 33 seconds ago Up 32 seconds 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp postgres_debezium_cdc_kafka_1
56b956539230 debezium/postgres:13 "docker-entrypoint.s…" 35 seconds ago Up 7 seconds 0.0.0.0:5432->5432/tcp, :::5432->5432/tcp postgres_debezium_cdc_postgres_1
f149af2c21b6 confluentinc/cp-zookeeper:5.5.3 "/etc/confluent/dock…" 35 seconds ago Up 32 seconds 2181/tcp, 2888/tcp, 3888/tcp postgres_debezium_cdc_zookeeper_1
┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 10:55:03 ~ master
go into the postgres container
┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 10:55:05 ~ master
└─ λ docker exec -it 56b956539230 /bin/bash
root@56b956539230:/# psql -U docker -d exampledb -W
Password:
psql (13.16 (Debian 13.16-1.pgdg110+1))
Type "help" for help.
exampledb=#
exampledb=# create table student(id integer primary key, name varchar);
exampledb=# alter table public.student replica identity full;
ALTER TABLE
exampledb=#
┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 11:04:00 ~ master
└─ λ cat debezium.json | jq .
{
"name": "exampledb-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "docker",
"database.password": "docker",
"database.dbname": "exampledb",
"database.server.name": "postgres",
"table.include.list": "public.student"
}
}
┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 11:04:02 ~ master
└─ λ curl -i -XPOST H"Accept:application/json" -H"Content-type:application/json" localhost:8083/connectors/ --data @debezium.json
curl: (3) URL rejected: Port number was not a decimal number between 0 and 65535
HTTP/1.1 201 Created
Date: Fri, 03 Jan 2025 16:04:07 GMT
Location: http://localhost:8083/connectors/exampledb-connector
Content-Type: application/json
Content-Length: 403
Server: Jetty(9.4.33.v20201020)
{"name":"exampledb-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","plugin.name":"pgoutput","database.hostname":"postgres","database.port":"5432","database.user":"docker","database.password":"docker","database.dbname":"exampledb","database.server.name":"postgres","table.include.list":"public.student","name":"exampledb-connector"},"tasks":[],"type":"source"}
┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 11:04:07 ~ master
Tail the logs from kafka to see when changes are made to the postgressql public.student
table
# looking at the networks available
┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 11:09:21 ~ master
└─ λ docker network ls
NETWORK ID NAME DRIVER SCOPE
10064cba43b1 bridge bridge local
7f8a065cc26d host host local
f50bc7147779 none null local
d52b340cbe88 postgres_debezium_cdc_default bridge local
docker run --tty \
--network postgres_debezium_cdc_default \
confluentinc/cp-kafkacat \
kafkacat -b kafka:9092 -C \
-s key=s -s value=avro \ # todo, what is a v r o ?
-r http://schema-registry:8081 \
-t postgres.public.student
insert into postgresql and watch kafka tail logs
exampledb=# insert into student (id, name) values (1, 'batman');
INSERT 0 1
exampledb=# select * from student;
id | name
----+--------
1 | batman
(1 row)
exampledb=#
kafka tail logs
┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 11:21:46 ~ master
└─ λ docker run --tty --network postgres_debezium_cdc_default confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -s key=s -s value=avro -r http://schema-registry:8081 -t postgres.public.student
% Reached end of topic postgres.public.student [0] at offset 0
{"before": null, "after": {"Value": {"id": 1, "name": {"string": "batman"}}}, "source": {"version": "1.4.2.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1735921381636, "snapshot": {"string": "false"}, "db": "exampledb", "schema": "public", "table": "student", "txId": {"long": 491}, "lsn": {"long": 23890376}, "xmin": null}, "op": "c", "ts_ms": {"long": 1735921381761}, "transaction": null}
% Reached end of topic postgres.public.student [0] at offset 1
exampledb=# insert into student (id, name) values (2, 'link');
INSERT 0 1
exampledb=#
{"before": null, "after": {"Value": {"id": 2, "name": {"string": "link"}}}, "source": {"version": "1.4.2.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1735921509289, "snapshot": {"string": "false"}, "db": "exampledb", "schema": "public", "table": "student", "txId": {"long": 492}, "lsn": {"long": 23891008}, "xmin": null}, "op": "c", "ts_ms": {"long": 1735921509649}, "transaction": null}
% Reached end of topic postgres.public.student [0] at offset 2
more fun with logging and changes
exampledb=# select * from student;
id | name
----+--------
1 | batman
2 | link
(2 rows)
exampledb=# update student set name='shyguy' where id=1;
UPDATE 1
exampledb=# select * from student;
id | name
----+--------
2 | link
1 | shyguy
(2 rows)
exampledb=#
{"before": {"Value": {"id": 1, "name": {"string": "batman"}}}, "after": {"Value": {"id": 1, "name": {"string": "shyguy"}}}, "source": {"version": "1.4.2.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1735921658992, "snapshot": {"string": "false"}, "db": "exampledb", "schema": "public", "table": "student", "txId": {"long": 493}, "lsn": {"long": 23891432}, "xmin": null}, "op": "u", "ts_ms": {"long": 1735921659266}, "transaction": null}
% Reached end of topic postgres.public.student [0] at offset 3
wrote an application to sub to the kafka topic of interest and capture the changes, can do whatever you like with the data like send to another target or alert on specific changes..
┌───( rmintz@rmintz-dt )-[ python ]
│ Tue 07 Jan 12:12:52 ~ master
└─ λ source ~/virt3-postgres/bin/activate
┌───( rmintz@rmintz-dt )-[ python ]
│ Tue 07 Jan 12:15:56 ~ master
└─ λ ./main.py
this is response: <Response [200]>
%4|1736270163.607|CONFWARN|rdkafka#consumer-1| [thrd:app]: Configuration property `sasl.mechanism` set to `PLAIN` but `security.protocol` is not configured for SASL: recommend setting `security.protocol` to SASL_SSL or SASL_PLAINTEXT
Waiting...
Waiting...
message.topic: postgres.public.student
message key: b'\x00\x00\x00\x00\x01\x0e'
value: b'\x00\x00\x00\x00\x02\x02\x0e\x02\x0cpacman\x02\x0e\x02\x0cpacman\x161.4.2.Final\x14postgresql\x10postgres\xe6\xdb\xa1\x9c\x88e\x00\nfalse\x12exampledb\x0cpublic\x0estudent\x02\xfa\x07\x02\xc0\xc1\xe5\x16\x00\x02u\x02\xd8\xe1\xa1\x9c\x88e\x00'
value type: <class 'bytes'>
this is response: <Response [200]>
{'before': {'id': 7, 'name': 'pacman'}, 'after': {'id': 7, 'name': 'pacman'}, 'source': {'version': '1.4.2.Final', 'connector': 'postgresql', 'name': 'postgres', 'ts_ms': 1736270165747, 'snapshot': 'false', 'db': 'exampledb', 'schema': 'public', 'table': 'student', 'txId': 509, 'lsn': 23900256, 'xmin': None}, 'op': 'u', 'ts_ms': 1736270166124, 'transaction': None}
Waiting...
Waiting...
Waiting...
Waiting...
turorial ref: https://www.youtube.com/redirect?event=video_description&redir_token=QUFFLUhqblJ4NkR6NlRITzRFQkdBcmREblZwaXBFazFQd3xBQ3Jtc0trQzlGMVpDY2xBOW1XQ1JNUlJ2cUc1UHkyWVFOeEFDN2VYNWkydkpOeHZmVWJYY1JlQzc4OVQtRmgyUEV1bEdyOV9RTG5EV19LUVVqSTZFaEx4QVlNeXNvOUZ0NjR3aTdfdmVPT0xvRGQxSkZraDdJbw&q=https%3A%2F%2Fgithub.com%2Firtiza07%2Fpostgres_debezium_cdc&v=YZRHqRznO-o
github origin: https://github.com/irtiza07/postgres_debezium_cdc.git