https://debezium.io/blog/2022/04/07/read-only-incremental-snapshots/
Shopifyのエンジニアリングチームは最近、Debezium MySQLコネクタを改良し、 コネクタによる書き込みアクセスなしでデータベースの増分スナップショットをサポートしました。 これは、Debeziumを読み取り専用レプリカにポイントする場合に必要です。 さらに、Debezium MySQLコネクタでは、増分スナップショット中にスキーマを変更できるようになりました。 このブログ投稿では、これらの機能の実装の詳細について説明します。
Netflixが変更データキャプチャフレームワークを発表した後、Debeziumは1.6リリースで増分スナップショット機能を追加しました。 Shopifyでは変更データキャプチャ(CDC)にDebeziumを使用しており、アーリーアダプターとして積極的に活用したいと考えていました。 また、書き込みやロックが発生しないソリューションを求めていました。
書き込みなしソリューションにより、読み取りレプリカからの変更をキャプチャすることができ、 CDC によってデータベース側でデータ破損が発生しないことが最大限に保証されます。
スキーマ移行のブロックが他のプロジェクトの開発に影響を及ぼしたため、 スナップショット作成と移行作業を調整する必要がありました。解決策として、スナップショットは週末のみ実行し、 結果としてスナップショット作成の頻度を可能な限り減らすようにしました。 このプロセス部分も改善の余地があると感じました。
このブログ投稿では、MySQL コネクタでの増分スナップショット中のロックフリーのスキーマ変更処理を含む、 読み取り専用増分スナップショットの実装の技術的な詳細について詳しく説明します。
Debeziumの増分スナップショットに関するブログ記事では、デフォルトの実装について詳しく説明しています。 このアルゴリズムは、2種類のシグナルに対応するシグナリングテーブルを使用します。
- 透かしとして
snapshot-window-open/snapshot-window-close
- 増分スナップショットをトリガーする方法としての
execute-snapshot
読み取り専用シナリオでは、両方のタイプの信号を代替信号に置き換える必要がありました。
この解決策はMySQLに特有のもので、グローバルトランザクション識別子(GTID)に依存しています。
そのため、リードレプリカからデータを読み取る場合は、gtid_mode
を ON
に設定し、GTIDの順序を維持するようにデータベースを設定する必要があります。
gtid_mode = ON
enforce_gtid_consistency = ON
if replica_parallel_workers > 0 set replica_preserve_commit_order = ON
アルゴリズムは SHOW MASTER STATUS クエリを実行して、チャンク選択の前後に設定された実行済み GTID を取得します。
low watermark = executed_gtid_set
high watermark = executed_gtid_set - low watermark
読み取り専用実装では、ウォーターマークはGTIDセットの形式を持ちます。例えば、
2174B383-5441-11E8-B90A-C80AA9429562:1-3, 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19
このようなウォーターマークはバイナリログストリームには表示されません。 代わりに、アルゴリズムは各イベントのGTIDをメモリ内のウォーターマークと比較します。 この実装により、古い読み取りが発生せず、チャンクにはローウォーターマークまでのイベントよりも古い変更のみが含まれないことが保証されます。
(1) pause log event processing
(2) GtidSet lwGtidSet := executed_gtid_set from SHOW MASTER STATUS
(3) chunk := select next chunk from table
(4) GtidSet hwGtidSet := executed_gtid_set from SHOW MASTER STATUS subtracted by lwGtidSet
(5) resume log event processing
inwindow := false
// other steps of event processing loop
while true do
e := next event from changelog
append e to outputbuffer
if not inwindow then
if not lwGtidSet.contains(e.gtid) //reached the low watermark
inwindow := true
else
if hwGtidSet.contains(e.gtid) //haven't reached the high watermark yet
if chunk contains e.key then
remove e.key from chunk
else //reached the high watermark
for each row in chunk do
append row to outputbuffer
// other steps of event processing loop
データベーストランザクションは複数の行を変更する可能性があります。 この場合、複数のバイナリログイベントが同じGTIDを持つことになります。 GTIDは一意ではないため、チャンク選択ウィンドウの計算ロジックに影響します。 ウォーターマークのGTIDセットにそのGTIDが含まれていない場合、イベントはウィンドウの状態を更新します。 トランザクション完了やハートビートなどのイベントの後、同じGTIDを持つバイナリログイベントは発生しません。 これらのイベントの場合、ウォーターマークの上限に達するだけでウィンドウの開閉がトリガーされます。
重複排除は、デフォルトの実装と同様に、チャンク選択ウィンドウ内で行われます。 最後に、アルゴリズムは重複排除されたチャンクをハイウォーターマークの直後に挿入します。
スナップショットを進めるには、バイナリログイベントの受信が不可欠です。 そのため、アルゴリズムはすべてのイベントと、スナップショットに含まれていないテーブルのGTIDをチェックします。
MySQLサーバーは、レプリケーション接続がx秒間アイドル状態になった後にハートビートイベントを送信します。 読み取り専用実装では、バイナリログの更新頻度が低い場合にハートビートを利用します。
ハートビートのGTIDは最新のbinlogイベントと同じです。 したがって、ハートビートの場合は、ハイウォーターマークの上限に達するだけで十分です。
このアルゴリズムは、ハートビートのGTIDのserver_uuid部分を用いて、
ハイウォーターマークから最大のトランザクションIDを取得します。
実装では、ハイウォーターマークに単一の server_uuid
が含まれるようにしています。
server_uuid
が変更されないことで、ハートビートによってウィンドウが早期に閉じられてしまうシナリオを回避できます。
例として、以下の画像をご覧ください。
ウィンドウが開いているかどうかは関係ないため、低ウォーターマークとのハートビート比較は不要です。 これにより、高ウォーターマークと低ウォーターマークの間に新しいイベントがない場合のチェックが簡素化されます。
チャンク選択時にバイナリログイベントが存在しなかった場合、バイナリログイベントによってウィンドウがすぐに開いたり閉じたりすることがあります。 この場合、ハイウォーターマークは空セットになります。 この場合、スナップショットチャンクは重複排除されずにローウォーターマークの直後に挿入されます。
Debeziumは、シグナリングテーブルへの挿入によってトリガーされるアドホックな増分スナップショットをサポートしています。 読み取り専用の代替手段として、特定のKafkaトピックを介してシグナルを送信することもできます。 メッセージの形式はシグナリングテーブルの構造を模倣しています。 スナップショット実行Kafkaメッセージには、以下のパラメータが含まれます。
data-collections
- キャプチャするテーブルのリストtype
- INCREMENTALに設定
例:
Key: dbserver1
Value: {"type":"execute-snapshot","data": {"data-collections": ["inventory.orders"], "type": "INCREMENTAL"}}
MySQLコネクタの設定に新しいsignal.kafka.topic
プロパティが追加されました。
トピックにはパーティションが1つと削除保持ポリシーが必要です。
Kafkaトピックからシグナルメッセージは別のスレッドによって取得されます。
Kafkaメッセージのキーは、database.server.name
に設定されているコネクタ名と一致する必要があります。
コネクタは、ログエントリでコネクタ名と一致しないイベントをスキップします。
メッセージキーのチェックにより、複数のコネクタでシグナルトピックを再利用できます。
増分スナップショット実行中のコネクタのオフセットには、増分スナップショットコンテキストが含まれます。 読み取り専用実装では、Kafkaシグナルのオフセットを増分スナップショットコンテキストに追加します。 オフセットを追跡することで、コネクタの再起動時にシグナルが欠落したり、二重処理されたりすることを防ぎます。
ただし、読み取り専用の増分スナップショットを実行するためにKafkaを使用する必要はなく、
シグナリングテーブルに書き込まれたデフォルトのexecute-snapshot
シグナルも機能します。
将来的には、アドホックな増分スナップショットをトリガーするためのREST APIも構想されており、Debezium Server経由で公開されるか、
Kafka Connectに追加のRESTリソースとしてデプロイされる可能性があります。
Debezium MySQLコネクタは、増分スナップショット中のスキーマ変更を許可します。 コネクタは増分スナップショット中のスキーマ変更を検出し、DDLのロックを回避するために現在のチャンクを再選択します。
MySQLコネクタのような履歴化されたDebeziumコネクタは、
バイナリログストリームからALTER TABLE
などのデータ定義言語(DDL)イベントを解析します。
コネクタは各テーブルのスキーマをメモリ内に保持し、それらのスキーマを使用して適切な変更イベントを生成します。
増分スナップショットの実装では、binlog スキーマを 2 回使用します。
- データベースからチャンクを選択した瞬間
- バイナリログストリームへのチャンク挿入の瞬間
増分スナップショットがデータベースにクエリを実行すると、行にはテーブルの最新スキーマが保持されます。 バイナリログストリームが遅れている場合、メモリ内のスキーマが最新のスキーマと異なる可能性があります。 解決策は、コネクタがバイナリログストリーム内のDDLイベントを受信するまで待機することです。 その後、コネクタはキャッシュされたテーブルの構造を使用して、正しい増分スナップショットイベントを生成できます。
スナップショットチャンクはJDBC APIを使用して選択されます。ResultSetMetaDataはチャンクのスキーマを格納します。 問題は、ResultSetMetaDataのスキーマとbinlog DDLのスキーマの形式が異なるため、同一かどうかを判断するのが困難であることです。
このアルゴリズムは、一致するResultSetベースとDDLベースのスキーマを取得するために2つのステップを実行します。 まず、コネクタはテーブルのスキーマを低ウォーターマークと高ウォーターマークの間でクエリします。 コネクタがウィンドウの終了を検出するとすぐに、バイナリログスキーマはResultSetMetaDataで最新の状態になります。 その後、コネクタはデータベースにクエリを実行し、スキーマが変更されていないことを確認します。 スキーマが変更されている場合、コネクタはこのプロセスを繰り返します。
アルゴリズムは、一致する ResultSet スキーマと binlog スキーマをメモリ内に保持し、 コネクタが各チャンクのスキーマをキャッシュされた ResultSet スキーマと比較できるようにします。
チャンクのスキーマがキャッシュされたResultSetスキーマと一致しない場合、コネクタは選択されたチャンクを削除します。 その後、アルゴリズムはResultSetスキーマとbinlogスキーマの一致検証プロセスを繰り返します。 その後、コネクタはデータベースから同じチャンクを再度選択します。
DDLイベントは、影響を受けるテーブルのチャンクの再読み込みもトリガーします。 再読み込みにより、ウィンドウの終了時点でのバイナリログストリームのスキーマよりも古いスキーマを持つチャンクが存在するというシナリオを回避できます。 例えば、以下の図はスキーマ変更前に行われたチャンクの選択を示しています。
標準的なチュートリアルデプロイメントを使用して、 読み取り専用のアドホック増分スナップショットのデモを行います。 ソースデータベースにはMySQLを使用します。 このデモでは、複数のターミナルウィンドウを開く必要があります。
# Terminal 1 - start the deployment
# Start the deployment
export DEBEZIUM_VERSION=1.9
docker-compose -f docker-compose-mysql.yaml up
# Terminal 2
# Enable enforce_gtid_consistency and gtid_mode
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -p$MYSQL_ROOT_PASSWORD inventory -e "SET GLOBAL enforce_gtid_consistency=ON; SET GLOBAL gtid_mode=OFF_PERMISSIVE; SET GLOBAL gtid_mode=ON_PERMISSIVE; SET GLOBAL gtid_mode=ON;"'
# Confirm the changes
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -p$MYSQL_ROOT_PASSWORD inventory -e "show global variables like \"%GTID%\";"'
# Create a signaling topic
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-topics.sh \
--create \
--bootstrap-server kafka:9092 \
--partitions 1 \
--replication-factor 1 \
--topic dbz-signals
# Start MySQL connector, capture only customers table and enable signaling
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @- <<EOF
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"table.include.list": "inventory.customers",
"read.only": "true",
"incremental.snapshot.allow.schema.changes": "true",
"incremental.snapshot.chunk.size": "5000",
"signal.kafka.topic": "dbz-signals",
"signal.kafka.bootstrap.servers": "kafka:9092"
}
}
EOF
ログから、table.include.list
設定に従って、customers
という 1 つのテーブルのみがスナップショットされていることがわかります。
tutorial-connect-1 | 2022-02-21 04:30:03,936 INFO MySQL|dbserver1|snapshot Snapshotting contents of 1 tables while still in transaction [io.debezium.relational.RelationalSnapshotChangeEventSource]
次のステップでは、データベース内の継続的なアクティビティをシミュレートします。
# Terminal 3
# Continuously consume messages from Debezium topic for customers table
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
# Terminal 4
# Modify records in the database via MySQL client
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'i=0; while true; do mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "INSERT INTO customers VALUES(default, \"name$i\", \"surname$i\", \"email$i\");"; ((i++)); done'
トピック dbserver1.inventory.customers
は継続的にメッセージストリームを受信します。
次に、コネクタを再設定して、orders
テーブルもキャプチャするようにします。
# Terminal 5
# Add orders table among the captured
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/inventory-connector/config -d @- <<EOF
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"table.include.list": "inventory.customers,inventory.orders",
"read.only": "true",
"incremental.snapshot.allow.schema.changes": "true",
"incremental.snapshot.chunk.size": "5000",
"signal.kafka.topic": "dbz-signals",
"signal.kafka.bootstrap.servers": "kafka:9092"
}
EOF
予想どおり、orders
テーブルにはメッセージがありません。
# Terminal 5
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.orders
それでは、シグナルを送信して、増分アドホックスナップショットを開始しましょう。
orders
テーブルのスナップショットメッセージはdbserver1.inventory.orders
トピックに配信されます。
customers
テーブルのメッセージは中断なく配信されます。
# Terminal 5
# Send the signal
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-producer.sh \
--broker-list kafka:9092 \
--property "parse.key=true" \
--property "key.serializer=org.apache.kafka.common.serialization.StringSerializer" \
--property "value.serializer=custom.class.serialization.JsonSerializer" \
--property "key.separator=;" \
--topic dbz-signals
dbserver1;{"type":"execute-snapshot","data": {"data-collections": ["inventory.orders"], "type": "INCREMENTAL"}}
# Check messages for orders table
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.orders
スナップショットの実行中にorders
テーブル内のレコードを変更すると、正確なタイミングと順序に応じて、
read
イベントまたはupdate
イベントとして発行されます。
最後のステップとして、デプロイされたシステムを終了し、すべてのターミナルを閉じます。
# Shut down the cluster
docker-compose -f docker-compose-mysql.yaml down
Debeziumは現在開発が活発に行われている優れた変更データキャプチャツールであり、そのコミュニティの一員であることを大変嬉しく思います。 Shopifyでは、本番環境で増分スナップショットを活用できることを大変嬉しく思っています。 同様のデータベース使用制限がある場合は、読み取り専用の増分スナップショット機能をご確認ください。 このプロジェクトは、私のチームとDebeziumチームのおかげで実現しました。