Skip to content

Instantly share code, notes, and snippets.

@cathalnoonan
Last active March 29, 2023 21:02
Show Gist options
  • Save cathalnoonan/b0d073274a3a5b4efe66c95b92170d73 to your computer and use it in GitHub Desktop.
Save cathalnoonan/b0d073274a3a5b4efe66c95b92170d73 to your computer and use it in GitHub Desktop.
Debezium Examples
mkdir -p ~/temp
cd ~/temp

# Clone sample code
git clone https://github.com/debezium/debezium-examples.git
cd ./debezium-examples/tutorial

alias docker-compose="docker compose"

# Start the topology as defined in https://debezium.io/documentation/reference/stable/tutorial.html
export DEBEZIUM_VERSION=2.0
docker-compose -f docker-compose-sqlserver.yaml up -d

# Initialize database and insert test data
cat debezium-sqlserver-init/inventory.sql | docker-compose -f docker-compose-sqlserver.yaml exec -T sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

# Start SQL Server connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json

Stream the Kafka messages:

# Consume messages from a Debezium topic
docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic server1.testDB.dbo.customers

Open a connection to the SQL Server (SSMS / Azure Data Studio / SqlCmd / ...) run SQL commands there.


Cleanup script:

# Shut down the cluster
docker-compose -f docker-compose-sqlserver.yaml down

SQL:

INSERT INTO dbo.customer (first_name, last_name, email)
VALUES ('john', 'smith', '[email protected]')
GO

Message on Kafka:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            }
        ],
        "optional": false,
        "name": "server1.testDB.dbo.customers.Key"
    },
    "payload": {
        "id": 1005
    }
}
{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "server1.testDB.dbo.customers.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "server1.testDB.dbo.customers.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "change_lsn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "commit_lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "event_serial_no"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.sqlserver.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "total_order"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "data_collection_order"
                    }
                ],
                "optional": true,
                "name": "event.block",
                "version": 1,
                "field": "transaction"
            }
        ],
        "optional": false,
        "name": "server1.testDB.dbo.customers.Envelope",
        "version": 1
    },
    "payload": {
        "before": null,
        "after": {
            "id": 1005,
            "first_name": "john",
            "last_name": "smith",
            "email": "[email protected]"
        },
        "source": {
            "version": "2.0.1.Final",
            "connector": "sqlserver",
            "name": "server1",
            "ts_ms": 1680121763927,
            "snapshot": "false",
            "db": "testDB",
            "sequence": null,
            "schema": "dbo",
            "table": "customers",
            "change_lsn": "00000027:00000308:0003",
            "commit_lsn": "00000027:00000308:0005",
            "event_serial_no": 1
        },
        "op": "c",
        "ts_ms": 1680121767420,
        "transaction": null
    }
}

SQL:

UPDATE dbo.customers
SET email='[email protected]'
WHERE email='[email protected]'
GO

Message on Kafka:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            }
        ],
        "optional": false,
        "name": "server1.testDB.dbo.customers.Key"
    },
    "payload": {
        "id": 1005
    }
}
{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "server1.testDB.dbo.customers.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "server1.testDB.dbo.customers.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "change_lsn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "commit_lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "event_serial_no"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.sqlserver.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "total_order"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "data_collection_order"
                    }
                ],
                "optional": true,
                "name": "event.block",
                "version": 1,
                "field": "transaction"
            }
        ],
        "optional": false,
        "name": "server1.testDB.dbo.customers.Envelope",
        "version": 1
    },
    "payload": {
        "before": {
            "id": 1005,
            "first_name": "john",
            "last_name": "smith",
            "email": "[email protected]"
        },
        "after": {
            "id": 1005,
            "first_name": "john",
            "last_name": "smith",
            "email": "[email protected]"
        },
        "source": {
            "version": "2.0.1.Final",
            "connector": "sqlserver",
            "name": "server1",
            "ts_ms": 1680122294707,
            "snapshot": "false",
            "db": "testDB",
            "sequence": null,
            "schema": "dbo",
            "table": "customers",
            "change_lsn": "00000027:000006e0:0002",
            "commit_lsn": "00000027:000006e0:0007",
            "event_serial_no": 2
        },
        "op": "u",
        "ts_ms": 1680122297292,
        "transaction": null
    }
}

SQL:

DELETE FROM dbo.customers
WHERE email='[email protected]'
GO

Message on Kafka:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            }
        ],
        "optional": false,
        "name": "server1.testDB.dbo.customers.Key"
    },
    "payload": {
        "id": 1005
    }
}
null
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment