Skip to content

Instantly share code, notes, and snippets.

https://iceberg.apache.org/docs/nightly/aws/

Iceberg AWS 統合

Icebergは、iceberg-awsモジュールを通じてさまざまなAWSサービスとの統合を提供します。 このセクションでは、AWSでIcebergを使用する方法について説明します。

AWS統合の有効化

iceberg-aws モジュールは、Spark および Flink エンジンランタイム(バージョン 0.11.0 以降)にバンドルされています。

https://iceberg.apache.org/docs/nightly/kafka-connect/

Kafka コネクト

Kafka Connectは、コネクタを介してKafkaとの間でデータをやり取りするための人気のフレームワークです。 KafkaからS3にデータを書き込むためのS3シンクや、リレーショナルデータベースからKafkaに変更データキャプチャレコードを書き込むための Debeziumソースコネクタなど、様々なコネクタが利用可能です。

シンプルで分散化された分散アーキテクチャを採用しています。クラスターは複数のワーカープロセスで構成され、 コネクタはこれらのプロセス上でタスクを実行して作業を実行します。

import socket
msk_host = "<MSK Cluster host>"
port = 9098
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((msk_host, port))
if result == 0:
print("✅ Glue から MSK への接続成功!")

https://debezium.io/blog/2021/10/20/using-debezium-create-data-lake-with-apache-iceberg/

今日では、分析、レポート、機械学習のニーズに合わせてデータレイクを構築するのが一般的です。

このブログ記事では、データレイクを構築するシンプルな方法をご紹介します。 このソリューションでは、Debeziumベースのリアルタイムデータパイプラインを使用し、ACIDトランザクションとSQL更新をサポートし、 高いスケーラビリティを実現します。 また、データフィードの構築にApache KafkaやApache Sparkアプリケーションは不要であるため、ソリューション全体の複雑さを軽減できます。

まず、データレイクの概念について簡単に説明しましょう。

https://debezium.io/documentation//reference/3.0/configuration/signalling.html

Debeziumコネクタへの信号の送信

概要

Debezium のシグナリングメカニズムは、コネクタの動作を変更したり、テーブルのアドホックスナップショットの開始など、 1 回限りのアクションをトリガーしたりする方法を提供します。 シグナルを使用してコネクタに特定のアクションを実行させるには、以下のチャネルのいずれかを使用するようにコネクタを設定します。

https://debezium.io/blog/2022/04/07/read-only-incremental-snapshots/

Shopifyのエンジニアリングチームは最近、Debezium MySQLコネクタを改良し、 コネクタによる書き込みアクセスなしでデータベースの増分スナップショットをサポートしました。 これは、Debeziumを読み取り専用レプリカにポイントする場合に必要です。 さらに、Debezium MySQLコネクタでは、増分スナップショット中にスキーマを変更できるようになりました。 このブログ投稿では、これらの機能の実装の詳細について説明します。

なぜ読み取り専用なのですか?

to translate into Japanese https://debezium.io/blog/2021/10/07/incremental-snapshots/

増分スナップショットを使用する理由

Debezium の創業以来の最大の問題点の一つは、キャプチャ対象テーブルリストの変更に対するサポートが最適とは言えないことでした。

ユーザーは、キャプチャ対象テーブルのリスト(table.include.list および関連オプション)を指定して新しいコネクタを作成しますが、 後々、当初 CDC に含まれていなかったテーブルもキャプチャするために、この設定を調整する必要が生じることがあります。 これらのテーブルからの変更をストリーミングするだけで十分であれば、この問題は非常に簡単に解決できます。 しかし、テーブルの既存の内容もキャプチャする必要がある場合はどうでしょうか?

#!/bin/sh
ips=(
66.249.71.9
66.249.71.8
66.249.71.7
66.249.66.89
66.249.66.88
66.249.66.78
66.249.66.69
#!/bin/sh
CATALOG_ID="AWS Account ID"
DATABASE="Glue Data Catalog Database"
IAM_ROLE="Execution IAM Role ARN"
tables=($(aws glue get-tables --catalog-id $CATALOG_ID \
--database-name $DATABASE \
--query 'TableList[*].Name' --output text))
SELECT
TABLE_SCHEMA,
TABLE_NAME
FROM
INFORMATION_SCHEMA.TABLES
WHERE
TABLE_TYPE = 'BASE TABLE'
AND TABLE_SCHEMA = '<database name>'
AND TABLE_NAME NOT IN (
SELECT DISTINCT TABLE_NAME