Skip to content

Instantly share code, notes, and snippets.

@alberttwong
Last active April 26, 2024 21:09
Show Gist options
  • Save alberttwong/a6d180c4eafecf9bdcf764196ca3d961 to your computer and use it in GitHub Desktop.
Save alberttwong/a6d180c4eafecf9bdcf764196ca3d961 to your computer and use it in GitHub Desktop.
debezium open data lakehouse
Using https://github.com/StarRocks/demo/tree/master/documentation-samples/datalakehouse. For simplicity, I am using the postgreSQL, zookeeper, kafka, kafka connect images from debezium since they have already made all the recommended modifications to support debezium.
Once the docker-compose envirnoment is up
## Set password in StarRocks
Because kafka doesn't like blank passwords.
SET PASSWORD = PASSWORD('123456');
## Create databases
### StarRocks
create database demo;
use demo;
CREATE TABLE `customer` (
`col_001` bigint(20) NOT NULL COMMENT "",
`col_002` varchar(65533) NULL COMMENT "",
`col_003` bigint(20) NULL COMMENT "",
`col_004` bigint(20) NULL COMMENT "",
`col_005` bigint(20) NULL COMMENT "",
`col_006` bigint(20) NULL COMMENT "",
`col_007` bigint(20) NULL COMMENT "",
`col_008` varchar(65533) NULL COMMENT "",
`col_009` varchar(65533) NULL COMMENT "",
`col_010` varchar(65533) NULL COMMENT "",
`col_011` varchar(65533) NULL COMMENT "",
`col_012` bigint(20) NULL COMMENT "",
`col_013` bigint(20) NULL COMMENT "",
`col_014` bigint(20) NULL COMMENT "",
`col_015` varchar(65533) NULL COMMENT "",
`col_016` varchar(65533) NULL COMMENT "",
`col_017` varchar(65533) NULL COMMENT "",
`col_018` bigint(20) NULL COMMENT "",
`_sling_loaded_at` bigint(20) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`col_001`)
DISTRIBUTED BY HASH(`col_001`);
## Kafka Setup
### Register Kafka Connect Connectors
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "postgres-tpcds-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgresql", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", "topic.prefix": "postgres", "topic.include.list": "public.customer" } }'
curl -i localhost:8083/connectors/ -H "Content-Type: application/json" -X POST -d '{ "name":"starrocks-kafka-connector", "config":{ "connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector", "topics":"postgres.public.customer", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":"true", "value.converter.schemas.enable":"false", "starrocks.http.url":"starrocks-fe:8030", "starrocks.topic2table.map":"postgres.public.customer:customer", "starrocks.username":"root", "starrocks.password":"123456", "starrocks.database.name":"demo", "sink.properties.strip_outer_array":"true" } }'
curl -i localhost:8083/connectors/ -H "Content-Type: application/json" -X POST -d '{ "name": "starrocks-kafka-connector", "config": { "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector", "topics": "postgres.public.customer", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "false", "starrocks.http.url": "starrocks-fe:8030", "starrocks.topic2table.map": "postgres.public.customer:customer", "starrocks.username": "root", "starrocks.password": "123456", "starrocks.database.name": "demo", "transforms": "addfield,unwrap", "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "true", "transforms.unwrap.delete.handling.mode": "rewrite", "sink.properties.strip_outer_array": "true" } }'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "jdbc-connector", "config": { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://starrocks-fe:9030/demo", "connection.username": "root", "connection.password": "123456", "insert.mode": "insert", "delete.enabled": "true", "schema.evolution": "none", "database.time_zone": "UTC", "table.name.format": "customer", "dialect.starrocks.catalog_name": "demo", "topics": "postgres.public.customer" } }'
If you want to check status, you can run
curl -H "Accept:application/json" localhost:8083/connectors/
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/tpcds-customer-connector
curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/postgres-tpcds-connector
curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/starrocks-kafka-connector
curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/jdbc-connector
### Monitor messages in flight [optional]
bin/kafka-topics.sh --bootstrap-server kafka:29092 --list
bin/kafka-console-consumer.sh --bootstrap-server kafka:29092 --topic postgres.public.customer --from-beginning
## Debezium Setup
### Debezium tiggering
By default, debezium only sends events on insert
SELECT CASE relreplident
WHEN 'd' THEN 'default'
WHEN 'n' THEN 'nothing'
WHEN 'f' THEN 'full'
WHEN 'i' THEN 'index'
END AS replica_identity
FROM pg_class
Change default to FULL
ALTER TABLE public.customer REPLICA IDENTITY FULL;
## Load data into postgresql
sling run --src-stream 'file://customer.csv' --src-options '{ header: false }' --tgt-conn POSTGRESLOCAL source_options: --tgt-object 'public.customer' --mode full-refresh
sling run --src-stream 'file://customer2.csv' --src-options '{ header: false }' --tgt-conn POSTGRESLOCAL source_options: --tgt-object 'public.customer' --mode full-refresh
sling run --src-conn POSTGRESLOCAL --src-stream 'public.customer' --tgt-conn STARROCKSLOCAL --tgt-object 'tpcds.customer' --mode full-refresh
sling run --src-conn POSTGRESLOCAL --src-stream 'public.customer' --tgt-conn STARROCKSLOCAL --tgt-object 'tpcds.customer' --primary-key col_001 --mode full-refresh
```
atwong@Albert-CelerData target % cat ~/.sling/env.yaml
# Environment Credentials for Sling CLI
# See https://docs.slingdata.io/sling-cli/environment
connections:
DUCKDB:
type: duckdb
instance: /Users/atwong/duckdb.db
# interactive: true
MYSQL:
type: mysql
url: mysql://admin:[email protected]:9030/albert
MYSQLLOCAL:
type: mysql
url: mysql://root:password@localhost:3306/employees
POSTGRESLOCAL:
type: postgres
url: postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable
STARROCKSLOCAL:
type: starrocks
fe_url: http://localhost:8040
url: starrocks://root:@localhost:9030/
variables: {}
```
```
atwong@Albert-CelerData tpcds-csv % cat customer2.csv
1|AAAAAAAABAAAAAAA|980124|7135|32946|2452238|2452208|Mr.|Javier|Lewis|Y|9|12|1936|CHILE|""|[email protected]|2452508
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment