to translate into Japanese https://debezium.io/blog/2021/10/07/incremental-snapshots/
Debezium の創業以来の最大の問題点の一つは、キャプチャ対象テーブルリストの変更に対するサポートが最適とは言えないことでした。
ユーザーは、キャプチャ対象テーブルのリスト(table.include.list および関連オプション)を指定して新しいコネクタを作成しますが、 後々、当初 CDC に含まれていなかったテーブルもキャプチャするために、この設定を調整する必要が生じることがあります。 これらのテーブルからの変更をストリーミングするだけで十分であれば、この問題は非常に簡単に解決できます。 しかし、テーブルの既存の内容もキャプチャする必要がある場合はどうでしょうか?
テーブル内の既存データのキャプチャは、従来、Debezium のスナップショットフェーズで実行されます。 このフェーズは、コネクタの初回起動時に 1 回実行され、その目的は、ある時点での一貫性のあるデータを取得することです (保存データを移動中のデータに変換)。 これはかなり長い操作になる可能性があり、定義上、完全に実行されるか、まったく実行されないかのどちらかになります (トランザクション セマンティクスに少し似ています)。
つまり、たとえばコネクタの再起動などによりスナップショットが完了しなかった場合、 最初から再実行する必要があり、すでに実行された内容はすべて破棄されます。 また、スナップショットの作成中は、データベースで並列実行されるデータ変更は、スナップショットが完了するまでストリーミングされません。 これにより、非常に大きなスナップショットの場合、トランザクション ログを利用可能な状態に維持する必要があるため、データベースリソースに問題が発生する可能性があります。
したがって、解決すべき問題は 3 つあります。
- 既存のデータをストリーミングする必要がある場合、キャプチャされたテーブルリストにテーブルを追加することはほぼ不可能です。
- 終了または再開できない、一貫性のあるスナップショットを作成するための長時間実行プロセス
- スナップショットが完了するまでブロックされているデータストリーミングを変更する
この問題は周知の事実であり、時間の経過とともに回避策を開発し、改善策や新たな解決策も検討してきました。 回避策として、一般的に推奨されていたのは複数のコネクタを使用するアプローチでした。 ユーザーには以下の手順を指示しました。
- コネクタを停止する
- 新しいテーブルのスナップショットを取得するために新しいコネクタを作成する(initial_only スナップショットモードを使用)
- 完了したら、新しいコネクタを停止する 新しくキャプチャしたテーブルをリストに追加して、古いコネクタを再設定し、起動する
この方法はある程度は効果がありましたが、非常に扱いにくく、前述のスナップショットの一貫性に関する疑問はすべて依然として残ります。
次のステップは、DBZ-175 を通じて、MySQL 用の Debezium コネクタにコミュニティ貢献することでした。 これは、複数のバイナリログリーダーを配置するという概念に基づいています。 1 つのリーダーは最初に設定されたテーブルをキャプチャし、もう 1 つのリーダーは新しいテーブルのスナップショットを取得し、 新しいテーブルからの変更をキャプチャします。後者のリーダーは元のリーダーに追いつき、それらを調整して 1 つのリーダーにマージします。
コードはうまく動作していましたが、プロセス自体が非常に複雑で、特殊なケースでエラーが発生する可能性があったため、 インキュベーション段階から抜け出すことはありませんでした。 最後に、独創的なアプローチではありましたが、残念ながら他のコネクタへの移植性はありませんでした。
2019年後半、Netflixエンジニアリングチームは、社内で変更データキャプチャフレームワークを開発したと発表しました。 また、ウォーターマークを用いた同時スナップショット実行という革新的なソリューションも考案しました。 このソリューションは、Andreas Andreakis氏とIoannis Papapanagiotou氏による論文「DBLog: A Watermark Based Change-Data-Capture Framework」で解説されています。
このアプローチの根底にある考え方は、スナップショットと同時に変更データのストリーミングを継続的に実行することです。
フレームワークは、トランザクションログに最低ウォーターマークと最高ウォーターマークを挿入し(ソースデータベースへの書き込みにより)、 その2つのポイント間でスナップショットテーブルの一部を読み取ります。 フレームワークは、ウォーターマーク間のデータベースの変更を記録し、ウィンドウ内で同じレコードがスナップショットされ変更された場合、 それらをスナップショットの値と照合します。 つまり、データはチャンク単位でスナップショットされます。コネクタ起動時に長いプロセスは不要です。
また、コネクタがクラッシュしたり、制御された終了が発生した場合でも、最後に完了したチャンクからスナップショットを再開できます。 Netflix によれば、実装は MySQL および PostgreSQL データベース向けに提供されています。
Debezium のウォーターマークベースのスナップショット手法の実装に進む前に、少し回り道する必要があります。
場合によっては、Debezium を外部から制御し、要求されたアクションを強制的に実行させることが便利なことがあります。 例えば、すでにスナップショットを作成したテーブルを再度スナップショットする必要がある場合(いわゆるアドホックスナップショット)を考えてみましょう。 ユーザーは、現在の操作を一時停止してスナップショットを作成するコマンドを Debezium に送信する必要があります。 そのために、Debezium はシグナリングテーブルを介して発行されるシグナルという概念を定義しています。 これは、ユーザーと Debezium 間の通信用に指定された特別なテーブルです。 Debezium はテーブルをキャプチャし、ユーザーが特定の操作を実行する必要がある場合は、シグナリングテーブルにレコードを書き込む(シグナルを送信する)だけです。 Debezium はキャプチャされた変更を受け取り、必要なアクションを実行します。
DBLogのスナップショット手法を知った時、この手法は汎用性が高く、Debeziumにも導入できると判断しました。 また、Debeziumコネクタフレームワークを使用して、様々なコネクタ間で多くのコードベースを共有しているため、 Debeziumコアコンポーネントに実装し、すべてのコネクタで同時にこの機能のメリットを享受できるようにすることが目標でした。 設計と実装は、DDD-3 Debezium設計ドキュメントに基づいて進められました。
Debezium の増分スナップショットは、アドホックスナップショットの形で利用できます。 ユーザーはスナップショットを実行するためにコネクタを設定するのではなく、シグナリングメカニズムを使用してスナップショットシグナルを送信し、 テーブルセットのスナップショットをトリガーします。 このシグナルは execute-snapshot と呼ばれ、シグナルメッセージは次の形式に従います。
{"data-collections": ["<table-id-1>", "<table-id-2>", "<table-id-3>", ...]}
テーブル スナップショットが要求されると、Debezium は次の処理を実行します。
- テーブル内の最大の主キーを取得します。これがスナップショットエンドポイントであり、その値はコネクタオフセットに格納されます。
incremental.snapshot.chunk.size
構成オプションで指定されたサイズに基づいて、テーブルをチャンクに分割します。
チャンクがクエリされると、動的 SQL ステートメントが構築され、次の incremental.snapshot.chunk.size
レコードが選択されます。
これらのレコードの主キーは、前のチャンクの最後の主キー (または最初のチャンクの最初の主キー) よりも大きく、記録された最大主キー以下です。
デフォルトのチャンクサイズは1,024です。効率化のために値を増やすことは可能です(スナップショットクエリの実行回数を減らすため)。 ただし、バッファに必要なメモリ消費量の増加とのバランスを考慮する必要があります。 ご自身の環境で実際に実験を行い、状況に最適な設定を見つけることをお勧めします。
チャンクの読み取りは少し複雑な手順です。
- スナップショットウィンドウオープン信号が送信されます。
- チャンククエリが実行され、チャンクの内容がメモリに読み込まれます。
- スナップショットウィンドウクローズ信号が送信されます。
なぜこれが必要なのでしょうか?データベースにクエリを実行するだけでは不十分なのはなぜでしょうか?その答えは次の図にあります。
Debezium はデータベースにアクセスする唯一のプロセスではありません。 多数のプロセスが同時にデータベースにアクセスし、現在スナップショットされている同じレコードにアクセスする可能性があります。 図に示すように、データへの変更はコミット順序に基づいてトランザクションログに書き込まれます。 チャンク読み取りトランザクションの正確なタイミングを計測して潜在的な競合を特定することは不可能であるため、 競合が発生する可能性のある時間を区切るために、ウィンドウのオープンイベントとクローズイベントが追加されます。 Debezium の役割は、これらの競合の重複排除です。
そのため、Debezium はチャンクによって生成されたすべてのイベントをバッファに記録します。 スナップショットウィンドウオープン信号を受信すると、トランザクションログからのすべてのイベントが、 スナップショットされたテーブルに属しているかどうかがチェックされます。 属している場合、バッファに主キーが含まれているかどうかがチェックされます。 主キーが含まれている場合、スナップショットイベントは競合の可能性があるため、バッファから削除されます。 スナップショットイベントとトランザクションログイベントを正しく順序付けることができないため、トランザクションログイベントのみが保持されます。 スナップショットウィンドウクローズ信号を受信すると、バッファに残っているスナップショットイベントは下流に送信されます。
次の図は、このようなバッファがどのように機能し、 トランザクション ログ イベントが下流に送信される前にどのようにフィルタリングされるかの例を示しています。
レコード K2、K3、K4 はすでにデータベースに存在します。 スナップショット ウィンドウが開く前に、レコード K1 が挿入され、K2 が更新され、K3 が削除されます。 これらのイベントは、ログから読み取られると下流に送信されます。 スナップショット ウィンドウが開き、そのクエリによって K1、K2、K4 がバッファに選択されます。 ウィンドウが開いている間に、K4 の削除がトランザクション ログから取得され、 K4 のスナップショット イベントがバッファから削除され、削除イベントが下流に送信されます。 K5 と K6 が挿入され、ログから取得されると、対応するイベントが発行されます。 特定のタイミングによっては、バッファ内にそれらの読み取りイベントも存在する場合があります (画像では K5 の場合)。これは削除されます。 スナップショット ウィンドウが閉じると、K1 と K2 の残りのスナップショット イベントがバッファから発行されます。
ここまで、増分スナップショットの概念を用いることで、コネクタの実行中に必要に応じて同じテーブルのスナップショットを繰り返し取得できることを示しました。 また、この実行によってトランザクションログからのストリーミングが停止しないことも示しました。 最後に、プロセスの一時停止と再開について説明します。
増分スナップショットの実行中は、各メッセージオフセットに増分スナップショットコンテキストが追加されます。 コンテキストは以下の3つの情報で表されます。
- スナップショットを作成するテーブルのリスト。最初のテーブルは現在スナップショットされているテーブルです。
- テーブルの最大の主キー
- 下流に送信された増分スナップショットの最後のイベントの主キー
これら3つの項目があれば、コネクタの再起動後(意図的な再起動でもクラッシュ後でも)にスナップショットを再開できます。 コネクタの起動時に、スナップショットを担当するコンポーネントはオフセットからデータを読み取ります。 コンポーネントは内部状態を初期化し、最後に処理されたイベントの後にスナップショットを再開します。 コネクタが実行中でない間に挿入または更新されたレコードは、通常のストリーム読み取りによって処理されます。 つまり、進行中のスナップショットの対象外となることに注意してください。
このアプローチにより、プロセスの堅牢性、再起動とクラッシュに対する回復力が確保され、 再配信されるイベントの数が最小限に抑えられます (少なくとも 1 回の配信セマンティクスは引き続き適用されます)。
増分スナップショットには、初期の一貫性のあるスナップショットと比較して、いくつかの欠点があります。
- スナップショットされたテーブルには主キーが含まれている必要があります
- スナップショット処理中にテーブルからイベントが削除されると、次のいずれかの状況が発生する可能性があります。
read
イベントとdelete
イベントは下流の消費者によって受信されるdelete
イベントのみが受信されます
- スナップショット処理中にテーブル内のイベントが更新されると、次のいずれかの状況が発生する可能性があります。
read
イベントとupdate
イベントは下流の消費者によって受信されるupdate
イベントとread
イベントが受信されます(順序が逆であることに注意)update
イベントのみが受信されます(read
イベントを発行するチャンク内で更新が発生した場合、そのread
イベントは重複排除中に破棄されます)
一般的に、read
イベントはテーブル内のレコードの初期状態ではなく、任意の時点におけるレコードの状態として理解する必要があります。
Debeziumでは、従来の初期スナップショットと比較して、コンシューマのセマンティクスが若干変更されています。
増分スナップショットの完了後、コンシューマが完全なデータセットを受け取ったことが保証されますが、
すべてのレコードに対して読み取り(スナップショット)イベントが発生するわけではなく、代わりにupdate
イベントが発生する可能性があります。
delete
イベントについても同様です。
コンシューマは、以前に見たことのないレコードに対してこのようなイベントが発生する可能性があることに留意する必要があります。
一般的な概念について説明したので、例を使ってもう少し詳しく見ていきましょう。 チュートリアルの標準デプロイメントを使用して、アドホック増分スナップショットのデモを行います。 ソースデータベースにはPostgreSQLを使用します。 このデモでは、複数のターミナルウィンドウが必要になります。
まず、デプロイメントを開始し、シグナリング テーブルを作成し、コネクタを起動します。
# Terminal 1 - start the deployment
# Start the deployment
export DEBEZIUM_VERSION=1.7
docker-compose -f docker-compose-postgres.yaml up
# Terminal 2
# Create a signalling table
echo "CREATE TABLE inventory.dbz_signal (id varchar(64), type varchar(32), data varchar(2048))" | docker-compose -f docker-compose-postgres.yaml exec -T postgres env PGOPTIONS="--search_path=inventory" bash -c "psql -U $POSTGRES_USER postgres"
# Start Postgres connector, capture only customers table and enable signalling
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @- <<EOF
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include": "inventory",
"table.include.list": "inventory.customers,inventory.dbz_signal",
"signal.data.collection": "inventory.dbz_signal"
}
}
EOF
ログから、table.include.list 設定に従って、customers という 1 つのテーブルのみがスナップショットされていることがわかります。
connect_1 | 2021-09-24 13:38:21,781 INFO Postgres|dbserver1|snapshot Snapshotting contents of 1 tables while still in transaction [io.debezium.relational.RelationalSnapshotChangeEventSource]
次のステップでは、データベース内の継続的なアクティビティをシミュレートします。
# Terminal 3
# Continuously consume messages from Debezium topic for customers table
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
# Terminal 4
# Modify records in the database via Postgres client
docker-compose -f docker-compose-postgres.yaml exec postgres env PGOPTIONS="--search_path=inventory" bash -c "i=0; while true; do psql -U $POSTGRES_USER postgres -c \"INSERT INTO customers VALUES(default,'name\$i','surname\$i','email\$i')\"; ((i++)); done"
トピック dbserver1.inventory.customers
は継続的にメッセージストリームを受信します。
次に、コネクタを再設定して、orders
テーブルもキャプチャするようにします。
# Terminal 5
# Add orders table among the captured
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/inventory-connector/config -d @- <<EOF
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include": "inventory",
"table.include.list": "inventory.customers,inventory.dbz_signal,inventory.orders",
"signal.data.collection": "inventory.dbz_signal"
}
EOF
予想どおり、orders テーブルにはメッセージがありません。
# Terminal 5
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.orders
それでは、シグナルを送信して、増分アドホックスナップショットを開始しましょう。
orders
テーブルのスナップショットメッセージは dbserver1.inventory.orders
トピックに配信されます。
customers
テーブルのメッセージは中断なく配信されます。
# Terminal 5
# Send the signal
echo "INSERT INTO inventory.dbz_signal VALUES ('signal-1', 'execute-snapshot', '{\"data-collections\": [\"inventory.orders\"]}')" | docker-compose -f docker-compose-postgres.yaml exec -T postgres env PGOPTIONS="--search_path=inventory" bash -c "psql -U $POSTGRES_USER postgres"
# Check messages for orders table
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.orders
スナップショットの実行中に orders
テーブル内のレコードを変更すると、正確なタイミングと順序に応じて、
read
イベントまたは update
イベントとして発行されます。
最後のステップとして、デプロイされたシステムを終了し、すべてのターミナルを閉じます。
# Shut down the cluster
docker-compose -f docker-compose-postgres.yaml down
このブログ記事では、DBLog論文で紹介された増分スナップショットの概念の動機について議論しました。 また、ここで説明した機能を実現するために過去に用いられてきた手法を検証しました。 さらに、この斬新なスナップショット手法をDebeziumに実装する方法について深く掘り下げ、最終的には実際に運用してみました。
増分スナップショットが皆様のお役に立てば幸いです。 皆様からのフィードバック、ご経験、そしてユースケースをお待ちしております。 今後のブログ投稿では、読み取り専用データベースの増分スナップショットのサポート(Debezium MySQLコネクタ バージョン1.7以降でサポート)と、 データベーステーブルではなくKafkaトピックをシグナル手段として使用してアドホックスナップショットをトリガーする方法についてご紹介します。