Last active
April 26, 2024 21:09
-
-
Save alberttwong/a6d180c4eafecf9bdcf764196ca3d961 to your computer and use it in GitHub Desktop.
debezium open data lakehouse
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Using https://github.com/StarRocks/demo/tree/master/documentation-samples/datalakehouse. For simplicity, I am using the postgreSQL, zookeeper, kafka, kafka connect images from debezium since they have already made all the recommended modifications to support debezium. | |
Once the docker-compose envirnoment is up | |
## Set password in StarRocks | |
Because kafka doesn't like blank passwords. | |
SET PASSWORD = PASSWORD('123456'); | |
## Create databases | |
### StarRocks | |
create database demo; | |
use demo; | |
CREATE TABLE `customer` ( | |
`col_001` bigint(20) NOT NULL COMMENT "", | |
`col_002` varchar(65533) NULL COMMENT "", | |
`col_003` bigint(20) NULL COMMENT "", | |
`col_004` bigint(20) NULL COMMENT "", | |
`col_005` bigint(20) NULL COMMENT "", | |
`col_006` bigint(20) NULL COMMENT "", | |
`col_007` bigint(20) NULL COMMENT "", | |
`col_008` varchar(65533) NULL COMMENT "", | |
`col_009` varchar(65533) NULL COMMENT "", | |
`col_010` varchar(65533) NULL COMMENT "", | |
`col_011` varchar(65533) NULL COMMENT "", | |
`col_012` bigint(20) NULL COMMENT "", | |
`col_013` bigint(20) NULL COMMENT "", | |
`col_014` bigint(20) NULL COMMENT "", | |
`col_015` varchar(65533) NULL COMMENT "", | |
`col_016` varchar(65533) NULL COMMENT "", | |
`col_017` varchar(65533) NULL COMMENT "", | |
`col_018` bigint(20) NULL COMMENT "", | |
`_sling_loaded_at` bigint(20) NULL COMMENT "" | |
) ENGINE=OLAP | |
PRIMARY KEY(`col_001`) | |
DISTRIBUTED BY HASH(`col_001`); | |
## Kafka Setup | |
### Register Kafka Connect Connectors | |
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "postgres-tpcds-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgresql", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", "topic.prefix": "postgres", "topic.include.list": "public.customer" } }' | |
curl -i localhost:8083/connectors/ -H "Content-Type: application/json" -X POST -d '{ "name":"starrocks-kafka-connector", "config":{ "connector.class":"com.starrocks.connector.kafka.StarRocksSinkConnector", "topics":"postgres.public.customer", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":"true", "value.converter.schemas.enable":"false", "starrocks.http.url":"starrocks-fe:8030", "starrocks.topic2table.map":"postgres.public.customer:customer", "starrocks.username":"root", "starrocks.password":"123456", "starrocks.database.name":"demo", "sink.properties.strip_outer_array":"true" } }' | |
curl -i localhost:8083/connectors/ -H "Content-Type: application/json" -X POST -d '{ "name": "starrocks-kafka-connector", "config": { "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector", "topics": "postgres.public.customer", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter.schemas.enable": "false", "starrocks.http.url": "starrocks-fe:8030", "starrocks.topic2table.map": "postgres.public.customer:customer", "starrocks.username": "root", "starrocks.password": "123456", "starrocks.database.name": "demo", "transforms": "addfield,unwrap", "transforms.addfield.type": "com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "true", "transforms.unwrap.delete.handling.mode": "rewrite", "sink.properties.strip_outer_array": "true" } }' | |
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "jdbc-connector", "config": { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://starrocks-fe:9030/demo", "connection.username": "root", "connection.password": "123456", "insert.mode": "insert", "delete.enabled": "true", "schema.evolution": "none", "database.time_zone": "UTC", "table.name.format": "customer", "dialect.starrocks.catalog_name": "demo", "topics": "postgres.public.customer" } }' | |
If you want to check status, you can run | |
curl -H "Accept:application/json" localhost:8083/connectors/ | |
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/tpcds-customer-connector | |
curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/postgres-tpcds-connector | |
curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/starrocks-kafka-connector | |
curl -i -X DELETE -H "Accept:application/json" localhost:8083/connectors/jdbc-connector | |
### Monitor messages in flight [optional] | |
bin/kafka-topics.sh --bootstrap-server kafka:29092 --list | |
bin/kafka-console-consumer.sh --bootstrap-server kafka:29092 --topic postgres.public.customer --from-beginning | |
## Debezium Setup | |
### Debezium tiggering | |
By default, debezium only sends events on insert | |
SELECT CASE relreplident | |
WHEN 'd' THEN 'default' | |
WHEN 'n' THEN 'nothing' | |
WHEN 'f' THEN 'full' | |
WHEN 'i' THEN 'index' | |
END AS replica_identity | |
FROM pg_class | |
Change default to FULL | |
ALTER TABLE public.customer REPLICA IDENTITY FULL; | |
## Load data into postgresql | |
sling run --src-stream 'file://customer.csv' --src-options '{ header: false }' --tgt-conn POSTGRESLOCAL source_options: --tgt-object 'public.customer' --mode full-refresh | |
sling run --src-stream 'file://customer2.csv' --src-options '{ header: false }' --tgt-conn POSTGRESLOCAL source_options: --tgt-object 'public.customer' --mode full-refresh | |
sling run --src-conn POSTGRESLOCAL --src-stream 'public.customer' --tgt-conn STARROCKSLOCAL --tgt-object 'tpcds.customer' --mode full-refresh | |
sling run --src-conn POSTGRESLOCAL --src-stream 'public.customer' --tgt-conn STARROCKSLOCAL --tgt-object 'tpcds.customer' --primary-key col_001 --mode full-refresh | |
``` | |
atwong@Albert-CelerData target % cat ~/.sling/env.yaml | |
# Environment Credentials for Sling CLI | |
# See https://docs.slingdata.io/sling-cli/environment | |
connections: | |
DUCKDB: | |
type: duckdb | |
instance: /Users/atwong/duckdb.db | |
# interactive: true | |
MYSQL: | |
type: mysql | |
url: mysql://admin:[email protected]:9030/albert | |
MYSQLLOCAL: | |
type: mysql | |
url: mysql://root:password@localhost:3306/employees | |
POSTGRESLOCAL: | |
type: postgres | |
url: postgresql://postgres:postgres@localhost:5432/postgres?sslmode=disable | |
STARROCKSLOCAL: | |
type: starrocks | |
fe_url: http://localhost:8040 | |
url: starrocks://root:@localhost:9030/ | |
variables: {} | |
``` | |
``` | |
atwong@Albert-CelerData tpcds-csv % cat customer2.csv | |
1|AAAAAAAABAAAAAAA|980124|7135|32946|2452238|2452208|Mr.|Javier|Lewis|Y|9|12|1936|CHILE|""|[email protected]|2452508 | |
``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment