以下は、Confluent Cloud ドキュメント「Set up Databricks Delta Lake (AWS) Sink Connector for Confluent Cloud」の内容を日本語に要約し、Markdown形式で整理したものです。
このガイドは、Confluent Cloud の Databricks Delta Lake Sink コネクタを AWS 上で使用するために必要な準備手順を解説しています。**Exactly-once semantics(EOS)**を実現するために、データを一時的にステージングする S3 バケットの設定が必須です。以下の内容は Databricks および AWS CloudFormation の基本知識を要件としています。(docs.confluent.io)
- Kafka クラスターと同じリージョンにてワークスペースを作成してください。
- Databricks の Quickstart オプション(「I have data in S3…」)を選択して CloudFormation テンプレートを起動。
- 「Data bucket name」を指定します(後の S3 バケット作成時に使用)。(docs.confluent.io)
-
前ステップで指定した「Data bucket name」で S3 バケットを作成(デフォルト設定で可)。
-
AWS IAM で
<workspace-name>-access-data-buckets
という名前の IAM ロールを確認。 -
IAM ロールに下記のようなポリシーが適用されていることを確認:
{ "Statement": [ { "Action": [ "s3:ListBucket", "s3:PutObject", "s3:GetObject", "s3:DeleteObject", "s3:PutObjectAcl" ], "Resource": [ "arn:aws:s3:::<data-bucket-name>/*", "arn:aws:s3:::<data-bucket-name>" ], "Effect": "Allow" } ] }
-
バケットポリシーを設定し、Databricks の IAM ロールに対して必要なアクセスを明示:
{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::<aws-account-id-databricks>:role/<iam-role-for-data-bucket-access>" }, "Action": [ "s3:GetBucketLocation", "s3:ListBucket" ], "Resource": "arn:aws:s3:::<data-bucket-name>" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::<aws-account-id-databricks>:role/<iam-role-for-data-bucket-access>" }, "Action": [ "s3:PutObject", "s3:GetObject", "s3:DeleteObject", "s3:PutObjectAcl" ], "Resource": "arn:aws:s3:::<data-bucket-name>/*" } ] }
-
Databricks ワークスペースの設定用ロール(Role ARN)を取得。
-
AWS IAM で以下のような幅広い EC2 リソース操作と
iam:PassRole
権限が含まれていることを確認:- EC2 操作全般(VPC、サブネット、インターネットゲートウェイ、セキュリティグループなど)
iam:PassRole
アクション許可(Databricks に特定 IAM ロールを渡すため)。(docs.confluent.io)
-
Databricks にて、CloudFormation で作成されたインスタンスプロファイルを使ってクラスタをデプロイ。
-
AWS 上でプログラム的アクセス可能な新規ユーザーを作成し、以下のポリシーをアタッチ:
{ "Statement": [ { "Action": ["s3:ListAllMyBuckets"], "Resource": "arn:aws:s3:::*", "Effect": "Allow" }, { "Action": ["s3:ListBucket", "s3:GetBucketLocation"], "Resource": "arn:aws:s3:::<bucket-name>", "Effect": "Allow" }, { "Action": [ "s3:PutObject", "s3:GetObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts", "s3:ListBucketMultipartUploads", "s3:PutObjectTagging", "s3:GetObjectTagging", "s3:DeleteObject" ], "Resource": "arn:aws:s3:::<bucket-name>/*", "Effect": "Allow" } ] }
これは、ステージング後にファイルを削除可能にし、コネクタが正しく動作するために重要です。(docs.confluent.io)
Databricks ノートブックなどを使い、以下の形式でテーブルを作成:
CREATE TABLE pageviews (
viewtime LONG,
userid STRING,
pageid STRING,
partition INT
) USING DELTA;
- コネクタは
partition
フィールドを追加するため、必ずpartition INT
を含める必要があります。(docs.confluent.io)
コネクタ設定時に必要な情報を以下にまとめます:
- Databricks のサーバーホスト名(例:
dbc-***.cloud.databricks.com
) - HTTP パス(例:
sql/protocolv1/o/...
) - Databricks のアクセストークン(User Settings → Access Tokens)
- Delta Lake テーブル名(例:
pageviews
) - AWS の Access Key ID と Secret Access Key
- ステージング用 S3 バケット名(例:
confluent-databricks
)(docs.confluent.io)
1. Databricks ワークスペースを Kafka クラスタと同じリージョンに作成
2. S3 ステージングバケットを作成し、IAM ロール/バケットポリシーを設定
3. ワークスペース用の IAM ロールに適切な EC2 および iam:PassRole ポリシーを確認
4. Databricks クラスタ用のインスタンスプロファイルと AWS ユーザー(S3 アクセス付与)を準備
5. Delta Lake に対応したテーブルを Databricks に作成(必ず partition INT を含める)
6. Connector 設定に必要な情報(ホスト名、HTTP パス、アクセストークン、AWS クレデンシャルなど)を取得