Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save kenzo0107/74499ba64e118861c239317add2406b1 to your computer and use it in GitHub Desktop.
Save kenzo0107/74499ba64e118861c239317add2406b1 to your computer and use it in GitHub Desktop.

https://debezium.io/blog/2022/04/07/read-only-incremental-snapshots/

Shopifyのエンジニアリングチームは最近、Debezium MySQLコネクタを改良し、 コネクタによる書き込みアクセスなしでデータベースの増分スナップショットをサポートしました。 これは、Debeziumを読み取り専用レプリカにポイントする場合に必要です。 さらに、Debezium MySQLコネクタでは、増分スナップショット中にスキーマを変更できるようになりました。 このブログ投稿では、これらの機能の実装の詳細について説明します。

なぜ読み取り専用なのですか?

Netflixが変更データキャプチャフレームワークを発表した後、Debeziumは1.6リリースで増分スナップショット機能を追加しました。 Shopifyでは変更データキャプチャ(CDC)にDebeziumを使用しており、アーリーアダプターとして積極的に活用したいと考えていました。 また、書き込みやロックが発生しないソリューションを求めていました。

書き込みなしソリューションにより、読み取りレプリカからの変更をキャプチャすることができ、 CDC によってデータベース側でデータ破損が発生しないことが最大限に保証されます。

スキーマ移行のブロックが他のプロジェクトの開発に影響を及ぼしたため、 スナップショット作成と移行作業を調整する必要がありました。解決策として、スナップショットは週末のみ実行し、 結果としてスナップショット作成の頻度を可能な限り減らすようにしました。 このプロセス部分も改善の余地があると感じました。

このブログ投稿では、MySQL コネクタでの増分スナップショット中のロックフリーのスキーマ変更処理を含む、 読み取り専用増分スナップショットの実装の技術的な詳細について詳しく説明します。

増分スナップショット

Debeziumの増分スナップショットに関するブログ記事では、デフォルトの実装について詳しく説明しています。 このアルゴリズムは、2種類のシグナルに対応するシグナリングテーブルを使用します。

  1. 透かしとして snapshot-window-open/snapshot-window-close
  2. 増分スナップショットをトリガーする方法としての execute-snapshot

読み取り専用シナリオでは、両方のタイプの信号を代替信号に置き換える必要がありました。

最高水準点と最低水準点の SHOW MASTER STATUS

この解決策はMySQLに特有のもので、グローバルトランザクション識別子(GTID)に依存しています。 そのため、リードレプリカからデータを読み取る場合は、gtid_modeON に設定し、GTIDの順序を維持するようにデータベースを設定する必要があります。

前提条件

gtid_mode = ON
enforce_gtid_consistency = ON
if replica_parallel_workers > 0 set replica_preserve_commit_order = ON

アルゴリズムは SHOW MASTER STATUS クエリを実行して、チャンク選択の前後に設定された実行済み GTID を取得します。

low watermark = executed_gtid_set
high watermark = executed_gtid_set - low watermark

読み取り専用実装では、ウォーターマークはGTIDセットの形式を持ちます。例えば、 2174B383-5441-11E8-B90A-C80AA9429562:1-3, 24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

このようなウォーターマークはバイナリログストリームには表示されません。 代わりに、アルゴリズムは各イベントのGTIDをメモリ内のウォーターマークと比較します。 この実装により、古い読み取りが発生せず、チャンクにはローウォーターマークまでのイベントよりも古い変更のみが含まれないことが保証されます。

読み取り専用透かしを使用した重複排除アルゴリズム

(1) pause log event processing
  (2) GtidSet lwGtidSet := executed_gtid_set from SHOW MASTER STATUS
  (3) chunk := select next chunk from table
  (4) GtidSet hwGtidSet := executed_gtid_set from SHOW MASTER STATUS subtracted by lwGtidSet
  (5) resume log event processing
      inwindow := false
      // other steps of event processing loop
      while true do
           e := next event from changelog
           append e to outputbuffer
           if not inwindow then
               if not lwGtidSet.contains(e.gtid) //reached the low watermark
                   inwindow := true
           else
               if hwGtidSet.contains(e.gtid) //haven't reached the high watermark yet
                   if chunk contains e.key then
                       remove e.key from chunk
               else //reached the high watermark
                   for each row in chunk do
                       append row to outputbuffer
           // other steps of event processing loop

透かしチェック

データベーストランザクションは複数の行を変更する可能性があります。 この場合、複数のバイナリログイベントが同じGTIDを持つことになります。 GTIDは一意ではないため、チャンク選択ウィンドウの計算ロジックに影響します。 ウォーターマークのGTIDセットにそのGTIDが含まれていない場合、イベントはウィンドウの状態を更新します。 トランザクション完了やハートビートなどのイベントの後、同じGTIDを持つバイナリログイベントは発生しません。 これらのイベントの場合、ウォーターマークの上限に達するだけでウィンドウの開閉がトリガーされます。

重複排除は、デフォルトの実装と同様に、チャンク選択ウィンドウ内で行われます。 最後に、アルゴリズムは重複排除されたチャンクをハイウォーターマークの直後に挿入します。

含まれているテーブルの更新はありません

スナップショットを進めるには、バイナリログイベントの受信が不可欠です。 そのため、アルゴリズムはすべてのイベントと、スナップショットに含まれていないテーブルのGTIDをチェックします。

バイナリログイベントなし

MySQLサーバーは、レプリケーション接続がx秒間アイドル状態になった後にハートビートイベントを送信します。 読み取り専用実装では、バイナリログの更新頻度が低い場合にハートビートを利用します。

ハートビートのGTIDは最新のbinlogイベントと同じです。 したがって、ハートビートの場合は、ハイウォーターマークの上限に達するだけで十分です。

このアルゴリズムは、ハートビートのGTIDのserver_uuid部分を用いて、 ハイウォーターマークから最大のトランザクションIDを取得します。 実装では、ハイウォーターマークに単一の server_uuid が含まれるようにしています。 server_uuid が変更されないことで、ハートビートによってウィンドウが早期に閉じられてしまうシナリオを回避できます。 例として、以下の画像をご覧ください。

ウィンドウが開いているかどうかは関係ないため、低ウォーターマークとのハートビート比較は不要です。 これにより、高ウォーターマークと低ウォーターマークの間に新しいイベントがない場合のチェックが簡素化されます。

透かし間の変更なし

チャンク選択時にバイナリログイベントが存在しなかった場合、バイナリログイベントによってウィンドウがすぐに開いたり閉じたりすることがあります。 この場合、ハイウォーターマークは空セットになります。 この場合、スナップショットチャンクは重複排除されずにローウォーターマークの直後に挿入されます。

Kafka トピックベースのシグナル

Debeziumは、シグナリングテーブルへの挿入によってトリガーされるアドホックな増分スナップショットをサポートしています。 読み取り専用の代替手段として、特定のKafkaトピックを介してシグナルを送信することもできます。 メッセージの形式はシグナリングテーブルの構造を模倣しています。 スナップショット実行Kafkaメッセージには、以下のパラメータが含まれます。

  • data-collections - キャプチャするテーブルのリスト
  • type - INCREMENTALに設定

例:

Key: dbserver1
Value: {"type":"execute-snapshot","data": {"data-collections": ["inventory.orders"], "type": "INCREMENTAL"}}

MySQLコネクタの設定に新しいsignal.kafka.topicプロパティが追加されました。 トピックにはパーティションが1つと削除保持ポリシーが必要です。

Kafkaトピックからシグナルメッセージは別のスレッドによって取得されます。 Kafkaメッセージのキーは、database.server.nameに設定されているコネクタ名と一致する必要があります。 コネクタは、ログエントリでコネクタ名と一致しないイベントをスキップします。 メッセージキーのチェックにより、複数のコネクタでシグナルトピックを再利用できます。

増分スナップショット実行中のコネクタのオフセットには、増分スナップショットコンテキストが含まれます。 読み取り専用実装では、Kafkaシグナルのオフセットを増分スナップショットコンテキストに追加します。 オフセットを追跡することで、コネクタの再起動時にシグナルが欠落したり、二重処理されたりすることを防ぎます。

ただし、読み取り専用の増分スナップショットを実行するためにKafkaを使用する必要はなく、 シグナリングテーブルに書き込まれたデフォルトのexecute-snapshotシグナルも機能します。 将来的には、アドホックな増分スナップショットをトリガーするためのREST APIも構想されており、Debezium Server経由で公開されるか、 Kafka Connectに追加のRESTリソースとしてデプロイされる可能性があります。

増分スナップショット中のスキーマの変更

Debezium MySQLコネクタは、増分スナップショット中のスキーマ変更を許可します。 コネクタは増分スナップショット中のスキーマ変更を検出し、DDLのロックを回避するために現在のチャンクを再選択します。

⚠️ 主キーへの変更はサポートされておらず、増分スナップショット中に実行すると誤った結果が生じる可能性があることに注意してください。

MySQLコネクタのような履歴化されたDebeziumコネクタは、 バイナリログストリームからALTER TABLEなどのデータ定義言語(DDL)イベントを解析します。 コネクタは各テーブルのスキーマをメモリ内に保持し、それらのスキーマを使用して適切な変更イベントを生成します。

増分スナップショットの実装では、binlog スキーマを 2 回使用します。

  1. データベースからチャンクを選択した瞬間
  2. バイナリログストリームへのチャンク挿入の瞬間

選択時にチャンクとバイナリログスキーマを一致させる

増分スナップショットがデータベースにクエリを実行すると、行にはテーブルの最新スキーマが保持されます。 バイナリログストリームが遅れている場合、メモリ内のスキーマが最新のスキーマと異なる可能性があります。 解決策は、コネクタがバイナリログストリーム内のDDLイベントを受信するまで待機することです。 その後、コネクタはキャッシュされたテーブルの構造を使用して、正しい増分スナップショットイベントを生成できます。

スナップショットチャンクはJDBC APIを使用して選択されます。ResultSetMetaDataはチャンクのスキーマを格納します。 問題は、ResultSetMetaDataのスキーマとbinlog DDLのスキーマの形式が異なるため、同一かどうかを判断するのが困難であることです。

このアルゴリズムは、一致するResultSetベースとDDLベースのスキーマを取得するために2つのステップを実行します。 まず、コネクタはテーブルのスキーマを低ウォーターマークと高ウォーターマークの間でクエリします。 コネクタがウィンドウの終了を検出するとすぐに、バイナリログスキーマはResultSetMetaDataで最新の状態になります。 その後、コネクタはデータベースにクエリを実行し、スキーマが変更されていないことを確認します。 スキーマが変更されている場合、コネクタはこのプロセスを繰り返します。

アルゴリズムは、一致する ResultSet スキーマと binlog スキーマをメモリ内に保持し、 コネクタが各チャンクのスキーマをキャッシュされた ResultSet スキーマと比較できるようにします。

チャンクのスキーマがキャッシュされたResultSetスキーマと一致しない場合、コネクタは選択されたチャンクを削除します。 その後、アルゴリズムはResultSetスキーマとbinlogスキーマの一致検証プロセスを繰り返します。 その後、コネクタはデータベースから同じチャンクを再度選択します。

挿入時にチャンクとバイナリログスキーマを一致させる

DDLイベントは、影響を受けるテーブルのチャンクの再読み込みもトリガーします。 再読み込みにより、ウィンドウの終了時点でのバイナリログストリームのスキーマよりも古いスキーマを持つチャンクが存在するというシナリオを回避できます。 例えば、以下の図はスキーマ変更前に行われたチャンクの選択を示しています。

デモ

標準的なチュートリアルデプロイメントを使用して、 読み取り専用のアドホック増分スナップショットのデモを行います。 ソースデータベースにはMySQLを使用します。 このデモでは、複数のターミナルウィンドウを開く必要があります。

# Terminal 1 - start the deployment
# Start the deployment
export DEBEZIUM_VERSION=1.9
docker-compose -f docker-compose-mysql.yaml up

# Terminal 2

# Enable enforce_gtid_consistency and gtid_mode
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -p$MYSQL_ROOT_PASSWORD inventory -e "SET GLOBAL enforce_gtid_consistency=ON; SET GLOBAL gtid_mode=OFF_PERMISSIVE; SET GLOBAL gtid_mode=ON_PERMISSIVE; SET GLOBAL gtid_mode=ON;"'

# Confirm the changes
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -p$MYSQL_ROOT_PASSWORD inventory -e "show global variables like \"%GTID%\";"'

# Create a signaling topic
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-topics.sh \
    --create \
    --bootstrap-server kafka:9092 \
    --partitions 1 \
    --replication-factor 1 \
    --topic dbz-signals

# Start MySQL connector, capture only customers table and enable signaling
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @- <<EOF
{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "table.include.list": "inventory.customers",
        "read.only": "true",
        "incremental.snapshot.allow.schema.changes": "true",
        "incremental.snapshot.chunk.size": "5000",
        "signal.kafka.topic": "dbz-signals",
        "signal.kafka.bootstrap.servers": "kafka:9092"
    }
}
EOF

ログから、table.include.list 設定に従って、customers という 1 つのテーブルのみがスナップショットされていることがわかります。

tutorial-connect-1    | 2022-02-21 04:30:03,936 INFO   MySQL|dbserver1|snapshot  Snapshotting contents of 1 tables while still in transaction   [io.debezium.relational.RelationalSnapshotChangeEventSource]

次のステップでは、データベース内の継続的なアクティビティをシミュレートします。

# Terminal 3
# Continuously consume messages from Debezium topic for customers table
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers

# Terminal 4
# Modify records in the database via MySQL client
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'i=0; while true; do mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "INSERT INTO customers VALUES(default, \"name$i\", \"surname$i\", \"email$i\");"; ((i++)); done'

トピック dbserver1.inventory.customers は継続的にメッセージストリームを受信します。 次に、コネクタを再設定して、orders テーブルもキャプチャするようにします。

# Terminal 5
# Add orders table among the captured
curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/inventory-connector/config -d @- <<EOF
{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.include.list": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "table.include.list": "inventory.customers,inventory.orders",
    "read.only": "true",
    "incremental.snapshot.allow.schema.changes": "true",
    "incremental.snapshot.chunk.size": "5000",
    "signal.kafka.topic": "dbz-signals",
    "signal.kafka.bootstrap.servers": "kafka:9092"
}
EOF

予想どおり、orders テーブルにはメッセージがありません。

# Terminal 5
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.orders

それでは、シグナルを送信して、増分アドホックスナップショットを開始しましょう。 ordersテーブルのスナップショットメッセージはdbserver1.inventory.ordersトピックに配信されます。 customersテーブルのメッセージは中断なく配信されます。

# Terminal 5
# Send the signal
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-producer.sh \
--broker-list kafka:9092 \
--property "parse.key=true" \
--property "key.serializer=org.apache.kafka.common.serialization.StringSerializer" \
--property "value.serializer=custom.class.serialization.JsonSerializer" \
--property "key.separator=;" \
--topic dbz-signals
dbserver1;{"type":"execute-snapshot","data": {"data-collections": ["inventory.orders"], "type": "INCREMENTAL"}}

# Check messages for orders table
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.orders

スナップショットの実行中にordersテーブル内のレコードを変更すると、正確なタイミングと順序に応じて、 readイベントまたはupdateイベントとして発行されます。

最後のステップとして、デプロイされたシステムを終了し、すべてのターミナルを閉じます。

# Shut down the cluster
docker-compose -f docker-compose-mysql.yaml down

結論

Debeziumは現在開発が活発に行われている優れた変更データキャプチャツールであり、そのコミュニティの一員であることを大変嬉しく思います。 Shopifyでは、本番環境で増分スナップショットを活用できることを大変嬉しく思っています。 同様のデータベース使用制限がある場合は、読み取り専用の増分スナップショット機能をご確認ください。 このプロジェクトは、私のチームとDebeziumチームのおかげで実現しました。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment