https://docs.confluent.io/cloud/current/cp-component/connect-cloud-config.html
以下は、リンク先「Connect Self-Managed Kafka Connect to Confluent Cloud」のドキュメントを日本語訳し、Markdown形式に整形したものです。原文の構成・見出しをできるだけ保持していますが、日本語訳として読みやすく調整しています。
Confluent Cloud にまだ対応していないコネクタを使いたい場合、自己管理(オンプレミスまたは任意の環境)で Kafka Connect クラスターを動かすことができます。このページでは、ローカル Connect クラスターを Confluent Cloud の Kafka クラスターに接続するための設定方法を説明します。 (Confluent Docs)
- Confluent Cloud へのアクセス権
- Confluent CLI
curljq
💡 ヒント Confluent Cloud の UI で環境とクラスタを選択し、Tools and client configuration > CLI Tools を利用すると、クラスタ設定ファイルを自動生成したり、ローカルクライアントとの接続手順を案内してくれる機能があります。 (Confluent Docs)
ソースコネクタが書き込む Kafka トピックは、事前に Confluent Cloud 側で手動作成する必要があります。 (Confluent Docs)
例:page_visits という名前のトピックを 1 パーティションで作成
confluent kafka topic create --partitions 1 page_visits最新の Confluent Platform を ZIP または TAR でダウンロードし、スタンドアローン構成または分散構成での利用に応じて設定します。 (Confluent Docs)
以下では、設定ファイル例を示します。 <cloud-bootstrap-servers> や <api-key>、<api-secret> などはあなたの環境に応じて置き換えてください。 (Confluent Docs)
⚠️ 本番運用に入る前に、Kafka Connect のワーカー設定およびコネクタ設定の意味を理解しておくようにしてください。(Confluent Docs)
my-connect-standalone.properties(Connect ワーカー設定)例:
bootstrap.servers=<cloud-bootstrap-servers>
# Kafka と Connect 間のデータ形式 (キー/値の変換器)
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# 内部データ (offset / config / status) 用変換器
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# オフセットをローカルファイルに保存 (テスト用途向け)
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
# セキュリティ設定 (SASL/SSL)
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<api-key>" password="<api-secret>";
security.protocol=SASL_SSL
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<api-key>" password="<api-secret>";
consumer.security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<api-key>" password="<api-secret>";
producer.security.protocol=SASL_SSL
# プラグイン (コネクタ・変換器・SMT 等) をロードするパス
plugin.path=/usr/share/java,/Users/<username>/confluent-6.2.1/share/confluent-hub-componentsmy-file-sink.properties(FileStreamSink コネクタ設定例):
name=my-file-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=page_visits
file=my_file.txt自己管理のコネクタで Enterprise 機能を利用する場合、以下の設定 (confluent.topic 関連) をコネクタ設定に含める必要があります: (Confluent Docs)
confluent.topic.bootstrap.servers=<cloud-bootstrap-servers>
confluent.topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
confluent.topic.security.protocol=SASL_SSL
confluent.topic.sasl.mechanism=PLAINまた、Reporter を使ってレスポンスを Kafka に書き戻すタイプのコネクタでは、以下のような設定も必要です: (Confluent Docs)
reporter.admin.bootstrap.servers=<cloud-bootstrap-servers>
reporter.admin.sasl.jaas.config=… (同様に API キー/シークレット指定)
reporter.admin.security.protocol=SASL_SSL
reporter.admin.sasl.mechanism=PLAIN
reporter.producer.bootstrap.servers=<cloud-bootstrap-servers>
reporter.producer.sasl.jaas.config=…
reporter.producer.security.protocol=SASL_SSL
reporter.producer.sasl.mechanism=PLAINさらに、Debezium(2.x 以降/1.9 以下)、Oracle XStream CDC の場合には、スキーマ履歴やデータベース履歴の Kafka 内部への書き込みに使う設定を追加する必要があります: (Confluent Docs)
schema.history.internal.kafka.bootstrap.servers=<cloud-bootstrap-servers>
schema.history.internal.consumer.security.protocol=SASL_SSL
schema.history.internal.consumer.ssl.endpoint.identification.algorithm=https
schema.history.internal.consumer.sasl.mechanism=PLAIN
schema.history.internal.consumer.sasl.jaas.config=… (キー・シークレット指定)
schema.history.internal.producer.security.protocol=SASL_SSL
schema.history.internal.producer.ssl.endpoint.identification.algorithm=https
schema.history.internal.producer.sasl.mechanism=PLAIN
schema.history.internal.producer.sasl.jaas.config=…
database.history.kafka.bootstrap.servers=<cloud-bootstrap-servers>
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.ssl.endpoint.identification.algorithm=https
database.history.consumer.sasl.mechanism=PLAIN
database.history.consumer.sasl.jaas.config=…
database.history.producer.security.protocol=SASL_SSL
database.history.producer.ssl.endpoint.identification.algorithm=https
database.history.producer.sasl.mechanism=PLAIN
database.history.producer.sasl.jaas.config=…最後に Connect を起動:
./bin/connect-standalone ./etc/my-connect-standalone.properties ./etc/my-file-sink.propertiesこのように起動すれば、前段で生成しておいた Kafka レコードが my_file.txt に出力されるはずです。 (Confluent Docs)
my-connect-distributed.properties(Connect ワーカー設定)例:
bootstrap.servers=<cloud-bootstrap-servers>
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Connect クラスターが管理する内部トピック設定
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
# セキュリティ設定 (SASL/SSL)
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<api-key>" password="<api-secret>";
security.protocol=SASL_SSL
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<api-key>" password="<api-secret>";
consumer.security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<api-key>" password="<api-secret>";
producer.security.protocol=SASL_SSL
plugin.path=/usr/share/java,/Users/<username>/confluent-6.2.1/share/confluent-hub-componentsさらに、Schema Registry 連携のための設定を追加することも可能です(Avro 変換器などを使う場合)。 (Confluent Docs)
起動コマンド例:
./bin/connect-distributed ./etc/my-connect-distributed.properties動作確認用に FileStreamSink コネクタを JSON フォーマットで作成する例も記載されています。 (Confluent Docs)
分散構成でも、前述の Enterprise 機能利用時/Reporter 利用時/Debezium 等での履歴設定 は、コネクタ設定に含める必要があります。 (Confluent Docs)