Skip to content

Instantly share code, notes, and snippets.

@R4wm
Last active January 7, 2025 17:16
Show Gist options
  • Save R4wm/5c062cf000d1f15359c9c2e70d5ebec1 to your computer and use it in GitHub Desktop.
Save R4wm/5c062cf000d1f15359c9c2e70d5ebec1 to your computer and use it in GitHub Desktop.
postgres exercise
┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 10:51:31 ~ master
└─ λ  docker-compose up -d 
Creating network "postgres_debezium_cdc_default" with the default driver
Pulling postgres (debezium/postgres:13)...
13: Pulling from debezium/postgres
fa0650a893c2: Pull complete
3099e59785c8: Pull complete

yay images

┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 10:54:58 ~ master
└─ λ  docker container ls 
CONTAINER ID   IMAGE                                    COMMAND                  CREATED          STATUS          PORTS                                                                     NAMES
0d20a8fef3c4   debezium/connect:1.4                     "/docker-entrypoint.…"   33 seconds ago   Up 32 seconds   8778/tcp, 9092/tcp, 0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 9779/tcp   postgres_debezium_cdc_debezium_1
ffbe59b3d11b   confluentinc/cp-schema-registry:5.5.3    "/etc/confluent/dock…"   33 seconds ago   Up 32 seconds   0.0.0.0:8081->8081/tcp, :::8081->8081/tcp                                 postgres_debezium_cdc_schema-registry_1
b9034b4258d5   confluentinc/cp-enterprise-kafka:5.5.3   "/etc/confluent/dock…"   33 seconds ago   Up 32 seconds   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp                                 postgres_debezium_cdc_kafka_1
56b956539230   debezium/postgres:13                     "docker-entrypoint.s…"   35 seconds ago   Up 7 seconds    0.0.0.0:5432->5432/tcp, :::5432->5432/tcp                                 postgres_debezium_cdc_postgres_1
f149af2c21b6   confluentinc/cp-zookeeper:5.5.3          "/etc/confluent/dock…"   35 seconds ago   Up 32 seconds   2181/tcp, 2888/tcp, 3888/tcp                                              postgres_debezium_cdc_zookeeper_1

┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 10:55:03 ~ master

go into the postgres container

┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 10:55:05 ~ master
└─ λ  docker exec -it 56b956539230 /bin/bash


root@56b956539230:/# psql -U docker -d exampledb -W
Password: 
psql (13.16 (Debian 13.16-1.pgdg110+1))
Type "help" for help.

exampledb=# 
exampledb=# create table student(id integer primary key, name varchar); 


setup replication on the database table

exampledb=# alter table public.student replica identity full; 
ALTER TABLE
exampledb=# 

setup debezium to connect to postgresql

┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 11:04:00 ~ master
└─ λ  cat debezium.json | jq . 
{
  "name": "exampledb-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "docker",
    "database.password": "docker",
    "database.dbname": "exampledb",
    "database.server.name": "postgres",
    "table.include.list": "public.student"
  }
}

┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 11:04:02 ~ master
└─ λ  curl -i -XPOST H"Accept:application/json" -H"Content-type:application/json" localhost:8083/connectors/ --data @debezium.json
curl: (3) URL rejected: Port number was not a decimal number between 0 and 65535
HTTP/1.1 201 Created
Date: Fri, 03 Jan 2025 16:04:07 GMT
Location: http://localhost:8083/connectors/exampledb-connector
Content-Type: application/json
Content-Length: 403
Server: Jetty(9.4.33.v20201020)

{"name":"exampledb-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","plugin.name":"pgoutput","database.hostname":"postgres","database.port":"5432","database.user":"docker","database.password":"docker","database.dbname":"exampledb","database.server.name":"postgres","table.include.list":"public.student","name":"exampledb-connector"},"tasks":[],"type":"source"}
┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 11:04:07 ~ master

Tail the logs from kafka to see when changes are made to the postgressql public.student table

# looking at the networks available
┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 11:09:21 ~ master
└─ λ  docker network ls 
NETWORK ID     NAME                            DRIVER    SCOPE
10064cba43b1   bridge                          bridge    local
7f8a065cc26d   host                            host      local
f50bc7147779   none                            null      local
d52b340cbe88   postgres_debezium_cdc_default   bridge    local


docker run --tty \
--network postgres_debezium_cdc_default \
confluentinc/cp-kafkacat \
kafkacat -b kafka:9092 -C \
-s key=s -s value=avro \  # todo, what is a v r o ?
-r http://schema-registry:8081 \
-t postgres.public.student

insert into postgresql and watch kafka tail logs

exampledb=# insert into student (id, name) values (1, 'batman');
INSERT 0 1
exampledb=# select * from student; 
 id |  name  
----+--------
  1 | batman
(1 row)

exampledb=# 

kafka tail logs

┌───( rmintz@rmintz-dt )-[ postgres_debezium_cdc ]
│ Fri 03 Jan 11:21:46 ~ master
└─ λ  docker run --tty --network postgres_debezium_cdc_default confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -s key=s -s value=avro -r http://schema-registry:8081 -t postgres.public.student
% Reached end of topic postgres.public.student [0] at offset 0
{"before": null, "after": {"Value": {"id": 1, "name": {"string": "batman"}}}, "source": {"version": "1.4.2.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1735921381636, "snapshot": {"string": "false"}, "db": "exampledb", "schema": "public", "table": "student", "txId": {"long": 491}, "lsn": {"long": 23890376}, "xmin": null}, "op": "c", "ts_ms": {"long": 1735921381761}, "transaction": null}
% Reached end of topic postgres.public.student [0] at offset 1


exampledb=# insert into student (id, name) values (2, 'link');
INSERT 0 1
exampledb=# 

{"before": null, "after": {"Value": {"id": 2, "name": {"string": "link"}}}, "source": {"version": "1.4.2.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1735921509289, "snapshot": {"string": "false"}, "db": "exampledb", "schema": "public", "table": "student", "txId": {"long": 492}, "lsn": {"long": 23891008}, "xmin": null}, "op": "c", "ts_ms": {"long": 1735921509649}, "transaction": null}
% Reached end of topic postgres.public.student [0] at offset 2

more fun with logging and changes

exampledb=# select * from student; 
 id |  name  
----+--------
  1 | batman
  2 | link
(2 rows)

exampledb=# update student set name='shyguy' where id=1;
UPDATE 1
exampledb=# select * from student; 
 id |  name  
----+--------
  2 | link
  1 | shyguy
(2 rows)

exampledb=# 

{"before": {"Value": {"id": 1, "name": {"string": "batman"}}}, "after": {"Value": {"id": 1, "name": {"string": "shyguy"}}}, "source": {"version": "1.4.2.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1735921658992, "snapshot": {"string": "false"}, "db": "exampledb", "schema": "public", "table": "student", "txId": {"long": 493}, "lsn": {"long": 23891432}, "xmin": null}, "op": "u", "ts_ms": {"long": 1735921659266}, "transaction": null}
% Reached end of topic postgres.public.student [0] at offset 3

wrote an application to sub to the kafka topic of interest and capture the changes, can do whatever you like with the data like send to another target or alert on specific changes..

┌───( rmintz@rmintz-dt )-[ python ]
│ Tue 07 Jan 12:12:52 ~ master
└─ λ  source ~/virt3-postgres/bin/activate


┌───( rmintz@rmintz-dt )-[ python ]
│ Tue 07 Jan 12:15:56 ~ master
└─ λ  ./main.py 
this is response:  <Response [200]>
%4|1736270163.607|CONFWARN|rdkafka#consumer-1| [thrd:app]: Configuration property `sasl.mechanism` set to `PLAIN` but `security.protocol` is not configured for SASL: recommend setting `security.protocol` to SASL_SSL or SASL_PLAINTEXT
Waiting...
Waiting...
message.topic:  postgres.public.student
message key:  b'\x00\x00\x00\x00\x01\x0e'
value:  b'\x00\x00\x00\x00\x02\x02\x0e\x02\x0cpacman\x02\x0e\x02\x0cpacman\x161.4.2.Final\x14postgresql\x10postgres\xe6\xdb\xa1\x9c\x88e\x00\nfalse\x12exampledb\x0cpublic\x0estudent\x02\xfa\x07\x02\xc0\xc1\xe5\x16\x00\x02u\x02\xd8\xe1\xa1\x9c\x88e\x00'
value type:  <class 'bytes'>
this is response:  <Response [200]>
{'before': {'id': 7, 'name': 'pacman'}, 'after': {'id': 7, 'name': 'pacman'}, 'source': {'version': '1.4.2.Final', 'connector': 'postgresql', 'name': 'postgres', 'ts_ms': 1736270165747, 'snapshot': 'false', 'db': 'exampledb', 'schema': 'public', 'table': 'student', 'txId': 509, 'lsn': 23900256, 'xmin': None}, 'op': 'u', 'ts_ms': 1736270166124, 'transaction': None}
Waiting...
Waiting...
Waiting...
Waiting...


@R4wm
Copy link
Author

R4wm commented Jan 3, 2025

next step: write consumer in python to just print out the messages subscribed to from kafka
do this in docker image
make sure the container is on the same network ..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment