So you want to connect Kafka Connect to Confluent Cloud? Here’s how.
-
Configure the worker
-
Pre-create any topics that your source connector will write to (since auto-topic creation is not enabled on Confluent Cloud)
-
If you’re using any licensed connectors from Confluent you need to include the Confluent Cloud details in the connector configuration
Configuring the worker breaks down into several parts:
-
Specifying the Confluent Cloud broker connection details
-
Optionally, configuring the Kafka Connect Avro converter to connect to Schema Registry in Confluent Cloud
-
Configure the worker to point to Confluent Cloud:
bootstrap.servers=<CCLOUD_BROKER_HOST>
-
Confluent Cloud provides you with redundancy for your data so specify replication factor of 3
config.storage.replication.factor=3 offset.storage.replication.factor=3 status.storage.replication.factor=3
-
This set of four parameters is the necessary security configuration for a client to connect to Confluent Cloud. It’s repeated three times, for the admin client, producer, and consumer in Kafka Connect.
security.protocol=SASL_SSL ssl.endpoint.identification.algorithm=https sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>"; consumer.security.protocol=SASL_SSL consumer.ssl.endpoint.identification.algorithm=https consumer.sasl.mechanism=PLAIN consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>"; producer.security.protocol=SASL_SSL producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";
This configuration sets the default converter for the Kafka Connect worker when serialising message values to Avro, and defines how to access the Schema Registry in Confluent Cloud.
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
value.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
If you want to use Avro for your message keys then repeat the configuration but with a key
prefix:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.schema.registry.basic.auth.user.info=<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>
key.converter.schema.registry.url=https://<SCHEMA_REGISTRY_ENDPOINT>
If you’re using a licensed Connector from Confluent you need to include in your connector configuration:
[…]
"confluent.topic.bootstrap.servers" : "${CCLOUD_BROKER_HOST}:9092",
"confluent.topic.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";",
"confluent.topic.security.protocol": "SASL_SSL",
"confluent.topic.ssl.endpoint.identification.algorithm": "https",
"confluent.topic.sasl.mechanism": "PLAIN"
(Why should you use Avro? This is why.)
Create a .env
file in the same folder that contains the values for each variable
CCLOUD_BROKER_HOST=xxxxx.confluent.cloud
CCLOUD_API_KEY=foo
CCLOUD_API_SECRET=zzzz
CCLOUD_SCHEMA_REGISTRY_URL=https://yyyyyy.aws.confluent.cloud
CCLOUD_SCHEMA_REGISTRY_API_KEY=yyyy
CCLOUD_SCHEMA_REGISTRY_API_SECRET=yyyy
The following is a complete Docker Compose that you can use to provision a Kafka Connect worker, connecting to Confluent Cloud.
---
version: '3'
services:
kafka-connect-01:
image: confluentinc/cp-kafka-connect:5.3.1
container_name: kafka-connect-01
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: "${CCLOUD_BROKER_HOST}:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group-01
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "${CCLOUD_SCHEMA_REGISTRY_URL}"
CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "${CCLOUD_SCHEMA_REGISTRY_API_KEY}:${CCLOUD_SCHEMA_REGISTRY_API_SECRET}"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "${CCLOUD_SCHEMA_REGISTRY_URL}"
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "${CCLOUD_SCHEMA_REGISTRY_API_KEY}:${CCLOUD_SCHEMA_REGISTRY_API_SECRET}"
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_SASL_MECHANISM: "PLAIN"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
#
CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
#
CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
bootstrap.servers=<CCLOUD_BROKER_HOST>
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";
consumer.security.protocol=SASL_SSL
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";
producer.security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=<SR_API_KEY>:<SR_API_SECRET>
value.converter.schema.registry.url=<SR_URL>
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.schema.registry.basic.auth.user.info=<SR_API_KEY>:<SR_API_SECRET>
key.converter.schema.registry.url=<SR_URL>
group.id=connect-cluster
config.storage.topic=_kafka-connect-group-01-configs
offset.storage.topic=_kafka-connect-group-01-offsets
status.storage.topic=_kafka-connect-group-01-statuses
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
plugin.path=share/java/,share/confluent-hub-components
Create a .env
file in the same folder that contains the values for each variable
CCLOUD_BROKER_HOST=xxxxx.confluent.cloud
CCLOUD_API_KEY=foo
CCLOUD_API_SECRET=zzzz
The following is a complete Docker Compose that you can use to provision a Kafka Connect worker, connecting to Confluent Cloud.
---
version: '3'
services:
kafka-connect-01:
image: confluentinc/cp-kafka-connect:5.3.1
container_name: kafka-connect-01
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: "${CCLOUD_BROKER_HOST}:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group-01
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_SASL_MECHANISM: "PLAIN"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
#
CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
#
CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${CCLOUD_API_KEY}\" password=\"${CCLOUD_API_SECRET}\";"
bootstrap.servers=<CCLOUD_BROKER_HOST>
security.protocol=SASL_SSL
ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";
consumer.security.protocol=SASL_SSL
consumer.ssl.endpoint.identification.algorithm=https
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";
producer.security.protocol=SASL_SSL
producer.ssl.endpoint.identification.algorithm=https
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="<CCLOUD_API_KEY>" password\="<CCLOUD_API_SECRET>";
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
group.id=connect-cluster
config.storage.topic=_kafka-connect-group-01-configs
offset.storage.topic=_kafka-connect-group-01-offsets
status.storage.topic=_kafka-connect-group-01-statuses
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3
plugin.path=share/java/,share/confluent-hub-components
Important point is that on confluent cloud most connectors are licensed and as such they need access to license topic and ACLs: https://docs.confluent.io/platform/current/connect/license.html
Otherwise you get what seem errors in auth on sasl that are, rather, inability to access license stored in cluster