Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save farunurisonmez/32970e8acd32acb16e9b1c9c6dc6b56f to your computer and use it in GitHub Desktop.
Save farunurisonmez/32970e8acd32acb16e9b1c9c6dc6b56f to your computer and use it in GitHub Desktop.
This Gist contains a Dockerized configuration for setting up a Kafka, ZooKeeper, Logstash and Debezium-based data pipeline on Ubuntu 24.04.1, utilizing Docker containers. It includes configurations for PostgreSQL to MongoDB and Elasticsearch data replication using Debezium as the CDC (Change Data Capture) tool, enabling seamless synchronization …
# /opt/cdc/config/debezium
group.id=1
debezium.source.config.storage.topic=debezium_configs
debezium.source.offset.storage.topic=debezium_offsets
debezium.source.status.storage.topic=debezium_statuses
debezium.sink.type=kafka
debezium.sink.kafka.producer.bootstrap.servers=172.21.0.3:9092
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.key.convertor=org.apache.kafka.connect.storage.StringConvertor
debezium.sink.kafka.producer.value.convertor=org.apache.kafka.connect.json.JsonConverter
debezium.source.offset.storage.file.filename=debezium_data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.plugin.name=pgoutput
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.database.hostname=172.18.120.210
debezium.source.database.port=5432
debezium.source.database.user=testuser
debezium.source.database.password=testpassword
debezium.source.database.dbname=test_db
debezium.source.topic.prefix=cdc
debezium.source.table.include.list=public.groups_group,public.groups_groupmember,public.groups_groupmessage,public.users_user
services:
zookeeper:
image: ${ZOOKEEPER_IMAGE}
container_name: ${ZOOKEEPER_CONTAINER_NAME}
environment:
- ALLOW_ANONYMOUS_LOGIN=${ZOOKEEPER_ALLOW_ANONYMOUS_LOGIN}
ports:
- "${ZOOKEEPER_PORT}:${ZOOKEEPER_PORT}"
networks:
cdb_network:
ipv4_address: ${ZOOKEEPER_IPV4_ADDRESS} # Static IP address is assigned here
kafka:
image: ${KAFKA_IMAGE}
platform: linux/x86_64
container_name: ${KAFKA_CONTAINER_NAME}
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=${ZOOKEEPER_IPV4_ADDRESS}:${ZOOKEEPER_PORT}
- KAFKA_BROKER_ID=${KAFKA_BROKER_ID}
- ALLOW_PLAINTEXT_LISTENER=${KAFKA_ALLOW_PLAINTEXT_LISTENER}
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP}
- KAFKA_CFG_LISTENERS=${KAFKA_CFG_LISTENERS}
- KAFKA_CFG_ADVERTISED_LISTENERS=${KAFKA_CFG_ADVERTISED_LISTENERS}
ports:
- "${KAFKA_PORT}:${KAFKA_PORT}"
- 29092:29092
networks:
cdb_network:
ipv4_address: ${KAFKA_IPV4_ADDRESS} # Static IP address is assigned here
kafka-ui:
image: ${KAFKA_UI_IMAGE}
container_name: ${KAFKA_UI_CONTAINER_NAME}
depends_on:
- kafka
environment:
- KAFKA_CLUSTERS_0_NAME=${KAFKA_UI_CLUSTER_NAME}
- KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS=${KAFKA_UI_BOOTSTRAP_SERVERS}
ports:
- "${KAFKA_UI_PORT}:${KAFKA_UI_PORT}"
networks:
cdb_network:
ipv4_address: ${KAFKA_UI_IPV4_ADDRESS} # Static IP address is assigned here
debezium:
image: ${DEBEZIUM_IMAGE}
container_name: ${DEBEZIUM_CONTAINER_NAME}
depends_on:
- kafka
environment:
- CONFIG_FILE=${DEBEZIUM_CONFIG_PATH}
volumes:
- ${DEBEZIUM_CONFIG_VOLUME}:/debezium/config
networks:
cdb_network:
ipv4_address: ${DEBEZIUM_IPV4_ADDRESS} # Static IP address is assigned here
ports:
- "9090:8080" # Host 9090 -> Container 8080
debezium-connect:
image: ${DEBEZIUM_CONNECT_IMAGE}
container_name: ${DEBEZIUM_CONNECT_CONTAINER_NAME}
depends_on:
- kafka
environment:
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=debezium-configs
- OFFSET_STORAGE_TOPIC=debezium-offsets
- BOOTSTRAP_SERVERS=${KAFKA_ADVERTISED_LISTENER}
- CONNECT_KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter
- CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter
- CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_PLUGIN_PATH=/kafka/connect
ports:
- "${DEBEZIUM_CONNECT_PORT}:${DEBEZIUM_CONNECT_PORT}"
volumes:
- ./kafka/connect:/kafka/connect
networks:
cdb_network:
ipv4_address: ${DEBEZIUM_CONNECT_IPV4_ADDRESS}
debezium-ui:
image: ${DEBEZIUM_UI_IMAGE}
container_name: ${DEBEZIUM_UI_CONTAINER_NAME}
depends_on:
- debezium-connect
ports:
- "${DEBEZIUM_UI_PORT}:${DEBEZIUM_UI_PORT}"
networks:
cdb_network:
ipv4_address: ${DEBEZIUM_UI_IPV4_ADDRESS}
environment:
- QUARKUS_HTTP_PORT=${DEBEZIUM_UI_PORT}
- KAFKA_CONNECT_URIS=${DEBEZIUM_UI_KAFKA_CONNECT_URIS}
networks:
cdb_network:
driver: bridge
ipam:
config:
- subnet: "172.21.0.0/16" # Subnet configuration for the network

Debezium MongoDB and PostgreSQL CDC Configuration

Table of Contents

System Requirements

Ensure you have the following system requirements:

  • Docker 20.10 or later
  • Docker Compose 1.27 or later
  • A machine with at least 4GB of RAM

Project Structure

/opt/cdc
├── config
│   ├── connectors
│   │   ├── mongodb-connector.json
│   │   ├── postgresql-connector.json
│   │   └── register_connector.sh
│   ├── debezium
│   │   └── application.properties
├── kafka
│   ├── connect
│   │   ├── debezium-connector-db2
│   │   ├── debezium-connector-ibmi
│   │   ├── debezium-connector-informix
│   │   ├── debezium-connector-jdbc
│   │   ├── debezium-connector-mariadb
│   │   ├── debezium-connector-mongodb
│   │   │   ├── CHANGELOG.md
│   │   │   ├── CONTRIBUTE.md
│   │   │   ├── COPYRIGHT.txt
│   │   │   ├── LICENSE-3rd-PARTIES.txt
│   │   │   ├── LICENSE.txt
│   │   │   ├── README.md
│   │   │   ├── README_JA.md
│   │   │   ├── README_KO.md
│   │   │   ├── README_ZH.md
│   │   │   ├── bson-4.11.0.jar
│   │   │   ├── bson-record-codec-4.11.0.jar
│   │   │   ├── debezium-api-3.0.0.Final.jar
│   │   │   ├── debezium-connector-mongodb-3.0.0.Final.jar
│   │   │   ├── debezium-core-3.0.0.Final.jar
│   │   │   ├── mongo-kafka-connect-1.15.0-all.jar
│   │   │   ├── mongodb-driver-core-4.11.0.jar
│   │   │   └── mongodb-driver-sync-4.11.0.jar
│   │   ├── debezium-connector-mysql
│   │   ├── debezium-connector-oracle
│   │   ├── debezium-connector-postgres
│   │   ├── debezium-connector-spanner
│   │   ├── debezium-connector-sqlserver
│   │   └── debezium-connector-vitess
├── logs
│   └── kafka
├── .env
└── docker-compose.yml

Step 1: Create the .env File

Create a .env file at the root of your project directory to specify environment variables required by Docker Compose.

Step 2: Docker Compose Configuration

Ensure that your docker-compose.yml is set up to handle Zookeeper, Kafka, Debezium, and their respective UIs.

Step 3: Running Kafka and Debezium with Docker Compose

Start Kafka and Debezium containers using:

docker-compose up -d

Step 4: Installing MongoDB Kafka Connector

To enable CDC for MongoDB, install the MongoDB Kafka Connector:

5cad0718eaa8   debezium/connect:3.0.0.Final                           "/docker-entrypoint.…"   About an hour ago   Up About an hour   8778/tcp, 0.0.0.0:8083->8083/tcp, [::]:8083->8083/tcp, 9092/tcp                                debezium-kafka-connect
root@DESKTOP-045RLF9:/opt/cdc/kafka/connect# docker cp 5cad0718eaa8:/kafka/connect/ ./

Copy the connector JAR file into the Debezium Kafka Connect container:

root@DESKTOP-045RLF9:/opt/cdc/kafka/connect/debezium-connector-mongodb# wget https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.15.0/mongo-kafka-connect-1.15.0-all.jar

Step 5: Configuring PostgreSQL and MongoDB CDC

Create connector configurations for PostgreSQL and MongoDB and submit them to the Kafka Connect REST API.

PostgreSQL CDC Connector Configuration

{
  "name": "postgres-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "password",
    "database.dbname": "mydatabase",
    "database.server.name": "dbserver1",
    "plugin.name": "pgoutput"
  }
}

MongoDB CDC Connector Configuration

{
  "name": "mongodb-cdc",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "mongodb://mongo:27017",
    "database": "mydatabase",
    "collection": "mycollection",
    "topic.prefix": "mongo"
  }
}

Step 6: Shutting Down Kafka and Debezium

To stop the services, run:

docker-compose down

Troubleshooting

If Kafka or Debezium fail to start, check logs using:

docker logs debezium-kafka-connect

Conclusion

This setup enables real-time change data capture between PostgreSQL and MongoDB using Debezium and Kafka.

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-document_type

https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html

# Zookeeper
ZOOKEEPER_IMAGE=debezium/zookeeper:3.0.0.Final
ZOOKEEPER_CONTAINER_NAME=zookeeper
ZOOKEEPER_PORT=2181
ZOOKEEPER_IPV4_ADDRESS=172.21.0.2
ZOOKEEPER_ALLOW_ANONYMOUS_LOGIN=yes
# Kafka
KAFKA_IMAGE=debezium/kafka:3.0.0.Final
KAFKA_CONTAINER_NAME=kafka
KAFKA_PORT=9092
KAFKA_IPV4_ADDRESS=172.21.0.3
KAFKA_BROKER_ID=1
KAFKA_ALLOW_PLAINTEXT_LISTENER=yes
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_ADVERTISED_LISTENER=PLAINTEXT://kafka:9092
KAFKA_ZOOKEEPER_CONNECT=172.21.0.2:2181
# Kafka Volumes
KAFKA_DATA_VOLUME=./data/kafka
KAFKA_LOGS_VOLUME=./logs/kafka
KAFKA_CONFIG_VOLUME=./config/kafka
# Kafka User
KAFKA_USER_ID=1000
KAFKA_GROUP_ID=1000
# Kafka UI
KAFKA_UI_IMAGE=provectuslabs/kafka-ui:latest
KAFKA_UI_CONTAINER_NAME=kafka-ui
KAFKA_UI_PORT=8080
KAFKA_UI_IPV4_ADDRESS=172.21.0.4
KAFKA_UI_CLUSTER_NAME=local
KAFKA_UI_BOOTSTRAP_SERVERS=${KAFKA_IPV4_ADDRESS}:${KAFKA_PORT}
# Debezium
DEBEZIUM_IMAGE=debezium/server:3.0.0.Final
DEBEZIUM_CONTAINER_NAME=debezium
DEBEZIUM_PORT=8081
DEBEZIUM_IPV4_ADDRESS=172.21.0.5
DEBEZIUM_CONFIG_PATH=./application.properties
# Debezium Volumes
DEBEZIUM_CONFIG_VOLUME=./config/debezium
# Debezium Kafka Connect
DEBEZIUM_CONNECT_IMAGE=debezium/connect:3.0.0.Final
DEBEZIUM_CONNECT_CONTAINER_NAME=debezium-kafka-connect
DEBEZIUM_CONNECT_PORT=8083
DEBEZIUM_CONNECT_BOOTSTRAP_SERVERS=172.21.0.3:9092
DEBEZIUM_CONNECT_IPV4_ADDRESS=172.21.0.6
# Debezium UI
DEBEZIUM_UI_IMAGE=debezium/debezium-ui:latest
DEBEZIUM_UI_CONTAINER_NAME=debezium-ui
DEBEZIUM_UI_PORT=9091
DEBEZIUM_UI_IPV4_ADDRESS=172.21.0.7
DEBEZIUM_UI_KAFKA_CONNECT_URIS=http://172.21.0.6:8083
# Shared Volumes
DEBEZIUM_CONFIG_PATH=./debezium/conf
DEBEZIUM_CONNECTORS_PATH=./debezium/connectors
{
"name": "mongodb-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "5",
"topics": "dbserver1.account.users",
"connection.uri": "mongodb://testuser:[email protected]:27017",
"database": "test_db",
"collection": "test_collection",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.postgres.PostgresHandler"
}
}
{
"name": "postgresql-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "172.18.120.210",
"database.port": "5432",
"database.user": "testuser",
"database.password": "testpassword",
"database.dbname": "test_db",
"database.server.name": "localhost",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_publication",
"database.history.kafka.bootstrap.servers": "172.21.0.3:9092",
"database.history.kafka.topic": "dbhistory.fullfillment",
"topic.prefix": "dbserver1"
}
}
curl -X POST -H "Content-Type: application/json" \
--data @postgresql-connector.json \
http://127.0.0.1:8083/connectors
@farunurisonmez
Copy link
Author

image

@farunurisonmez
Copy link
Author

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment