zookeeper 与 kafka 集群设置教程, 作为教程, 下文描述的步骤是在 1台 服务器上部署 启动三个 zookeeper 和 kafka 服务以组成模拟集群, 多机上的注意事项会在相关位置说明
FIXME
的均为在实际环境中需要注意的值(比如 ip, 端口, 文件路径), 其余值新增内容均为固定值
mkdir -p /data/kafka-cluster-test/{kafka,zookeeper}-data-{1,2,3}
# 这里仍使用项目中使用的版本: kafka_2.11-0.10.0.0
cd /data/kafka-cluster-test && wget https://archive.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
tar -xf kafka_2.11-0.10.0.0.tgz && mv kafka_2.11-0.10.0.0 kafka
- 参考: https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka/83858fdc451f1e83e612df5230a10ee134a61240 , 注意自定义密码及文件名称 这里我修改了 sh 脚本(脚本有所修改, 见下方附件)中的 PASS 为 'Kafka.Cluster@2018', 下列是我使用的 4 条生成证书文件的命令:
# 这里将证书保存在了 /data/kafka-cluster-test/ssl/ 路径下
mkdir -p /data/kafka-cluster-test/ssl/
cd /data/kafka-cluster-test/ssl/
# 该步骤生成 ca 根证书 (用于给其他证书签名做可信证书)
./tests/gen-ssl-certs.sh ca ca-cert Kafka.Cluster
# 该步骤生成 jks 格式的证书文件供 kafka broker 使用
./tests/gen-ssl-certs.sh -k server ca-cert kafka. Kafka.Cluster
# 该步骤生成 pem 格式的证书文件供 python 客户端 及 bro kafka plugin 使用 (librdkafka)
./tests/gen-ssl-certs.sh client ca-cert kafka. Kafka.Cluster
# 该步骤生成 jks 格式的证书文件供 logstash 的 kafka plugin 使用 (java client)
./tests/gen-ssl-certs.sh -k client ca-cert kafka. Kafka.Cluster
cd /data/kafka-cluster-test/kafka/config
cp zookeeper.properties zookeeper-1.properties
cp zookeeper.properties zookeeper-2.properties
cp zookeeper.properties zookeeper-3.properties
- 参照下列配置修改或添加配置项
tickTime=2000
# FIXME: zookeeper 数据存放地址
dataDir=/data/kafka-cluster-test/zookeeper-data-1
# FIXME: zookeeper 对外提供服务端口, 如果一台机器只运行一个 zookeeper 服务, 可以保留为默认值 2181
clientPort=22181
initLimit=5
syncLimit=2
# FIXME: zookeeper 集群的全部实例地址, server.x 中的 x 必须与 `dataDir`/myid 文件中的值相同, 两个端口在单节点单实例情况下推荐均使用相同端口 2881 和 3881
# 地址后的两个端口, 第一个是用来 follower 节点连接leader节点, 第二个端口用于 leader 选举
# see https://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_clusterOptions
server.1=10.0.81.9:22881:23881
server.2=10.0.81.9:22882:23882
server.3=10.0.81.9:22883:23883
# FIXME: 确保该文件内的值与 zookeeper 配置文件中的 server.x 中的 x 一致
echo 1 >> /data/kafka-cluster-test/zookeeper-data-1/myid
- 参照下列配置修改或添加配置项
tickTime=2000
# FIXME: zookeeper 数据存放地址
dataDir=/data/kafka-cluster-test/zookeeper-data-2
# FIXME: zookeeper 对外提供服务端口, 如果一台机器只运行一个 zookeeper 服务, 可以保留为默认值 2181
clientPort=22182
initLimit=5
syncLimit=2
# FIXME: zookeeper 集群的全部实例地址, server.x 中的 x 必须与 `dataDir`/myid 文件中的值相同, 两个端口在单节点单实例情况下推荐均使用相同端口 2881 和 3881
# 地址后的两个端口, 第一个是用来 follower 节点连接leader节点, 第二个端口用于 leader 选举
# see https://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_clusterOptions
server.1=10.0.81.9:22881:23881
server.2=10.0.81.9:22882:23882
server.3=10.0.81.9:22883:23883
# FIXME: 确保该文件内的值与 zookeeper 配置文件中的 server.x 中的 x 一致
echo 2 >> /data/kafka-cluster-test/zookeeper-data-2/myid
- 参照下列配置修改或添加配置项
tickTime=2000
# FIXME: zookeeper 数据存放地址
dataDir=/data/kafka-cluster-test/zookeeper-data-3
# FIXME: zookeeper 对外提供服务端口, 如果一台机器只运行一个 zookeeper 服务, 可以保留为默认值 2181
clientPort=22183
initLimit=5
syncLimit=2
# FIXME: zookeeper 集群的全部实例地址, server.x 中的 x 必须与 `dataDir`/myid 文件中的值相同, 两个端口在单节点单实例情况下推荐均使用相同端口 2881 和 3881
# 地址后的两个端口, 第一个是用来 follower 节点连接leader节点, 第二个端口用于 leader 选举
# see https://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_clusterOptions
server.1=10.0.81.9:22881:23881
server.2=10.0.81.9:22882:23882
server.3=10.0.81.9:22883:23883
# FIXME: 确保该文件内的值与 zookeeper 配置文件中的 server.x 中的 x 一致
echo 3 >> /data/kafka-cluster-test/zookeeper-data-3/myid
cd /data/kafka-cluster-test/kafka/config
cp server.properties server-1.properties
cp server.properties server-2.properties
cp server.properties server-3.properties
- 参照下列配置修改或添加配置项
# FIXME: broker ID, 每个 broker 必须不能相同
broker.id=1
# FIXME: 因为这里是单机部署多个 kafka broker, 因此我们使用自定义端口, 单机单 broker 推荐保留使用默认端口 9091
listeners=SSL://10.0.81.9:29091
advertised.listeners=SSL://10.0.81.9:29091
# FIXME: kafka 数据存放地址
log.dirs=/data/kafka-cluster-test/kafka-data-1
# FIXME: zookeeper 集群链接串
zookeeper.connect=10.0.81.9:22181,10.0.81.9:22182,10.0.81.9:22183
# 以下是 认证/加密相关配置
ssl.protocol = TLS
# FIXME: 注意修改该 jks 文件路径
ssl.keystore.location = /data/kafka-cluster-test/ssl/kafka.server.keystore.jks
ssl.keystore.password = Kafka.Cluster@2018
ssl.keystore.type = JKS
ssl.key.password = Kafka.Cluster@2018
# FIXME: 注意修改该 jks 文件路径
ssl.truststore.location = /data/kafka-cluster-test/ssl/kafka.server.truststore.jks
ssl.truststore.password = Kafka.Cluster@2018
ssl.truststore.type = JKS
ssl.client.auth = required
ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
security.inter.broker.protocol = SSL
- 参照下列配置修改或添加配置项
# FIXME: broker ID, 每个 broker 必须不能相同
broker.id=2
# FIXME: 因为这里是单机部署多个 kafka broker, 因此我们使用自定义端口, 单机单 broker 推荐保留使用默认端口 9091
listeners=SSL://10.0.81.9:29092
advertised.listeners=SSL://10.0.81.9:29092
# FIXME: kafka 数据存放地址
log.dirs=/data/kafka-cluster-test/kafka-data-2
# FIXME: zookeeper 集群链接串
zookeeper.connect=10.0.81.9:22181,10.0.81.9:22182,10.0.81.9:22183
# 以下是 认证/加密相关配置
ssl.protocol = TLS
# FIXME: 注意修改该 jks 文件路径
ssl.keystore.location = /data/kafka-cluster-test/ssl/kafka.server.keystore.jks
ssl.keystore.password = Kafka.Cluster@2018
ssl.keystore.type = JKS
ssl.key.password = Kafka.Cluster@2018
# FIXME: 注意修改该 jks 文件路径
ssl.truststore.location = /data/kafka-cluster-test/ssl/kafka.server.truststore.jks
ssl.truststore.password = Kafka.Cluster@2018
ssl.truststore.type = JKS
ssl.client.auth = required
ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
security.inter.broker.protocol = SSL
- 参照下列配置修改或添加配置项
# FIXME: broker ID, 每个 broker 必须不能相同
broker.id=3
# FIXME: 因为这里是单机部署多个 kafka broker, 因此我们使用自定义端口, 单机单 broker 推荐保留使用默认端口 9091
listeners=SSL://10.0.81.9:29093
advertised.listeners=SSL://10.0.81.9:29093
# FIXME: kafka 数据存放地址
log.dirs=/data/kafka-cluster-test/kafka-data-3
# FIXME: zookeeper 集群链接串
zookeeper.connect=10.0.81.9:22181,10.0.81.9:22182,10.0.81.9:22183
# 以下是 认证/加密相关配置
ssl.protocol = TLS
# FIXME: 注意修改该 jks 文件路径
ssl.keystore.location = /data/kafka-cluster-test/ssl/kafka.server.keystore.jks
ssl.keystore.password = Kafka.Cluster@2018
ssl.keystore.type = JKS
ssl.key.password = Kafka.Cluster@2018
# FIXME: 注意修改该 jks 文件路径
ssl.truststore.location = /data/kafka-cluster-test/ssl/kafka.server.truststore.jks
ssl.truststore.password = Kafka.Cluster@2018
ssl.truststore.type = JKS
ssl.client.auth = required
ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
security.inter.broker.protocol = SSL
如果开启了防火墙, 请将以下端口加入到白名单
# FIXME: 如果单机单实例, 每个实例使用相同端口的话, 可能就只需要配置4个: 2181, 2881, 3881, 9091
22181
22182
22183
22881
22882
22883
23881
23882
23883
29091
29092
29093
[unix_http_server]
file=/data/kafka-cluster-test/supervisor.sock ; (the path to the socket file)
[supervisord]
logfile=/data/kafka-cluster-test/supervisord.log ; (main log file;default $CWD/supervisord.log)
logfile_maxbytes=50MB ; (max main logfile bytes b4 rotation;default 50MB)
logfile_backups=10 ; (num of main logfile rotation backups;default 10)
loglevel=info ; (log level;default info; others: debug,warn,trace)
pidfile=/data/kafka-cluster-test/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
nodaemon=false ; (start in foreground if true;default false)
minfds=1024 ; (min. avail startup file descriptors;default 1024)
minprocs=200 ; (min. avail process descriptors;default 200)
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///data/kafka-cluster-test/supervisor.sock ; use a unix:// URL for a unix socket
[program:zookeeper-1]
command=/data/kafka-cluster-test/kafka/bin/zookeeper-server-start.sh /data/kafka-cluster-test/kafka/config/zookeeper-1.properties
user=root
stdout_logfile=/data/kafka-cluster-test/super-zookeeper-1.log
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
stopasgroup=true
priority=900
[program:zookeeper-2]
command=/data/kafka-cluster-test/kafka/bin/zookeeper-server-start.sh /data/kafka-cluster-test/kafka/config/zookeeper-2.properties
user=root
stdout_logfile=/data/kafka-cluster-test/super-zookeeper-2.log
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
stopasgroup=true
priority=900
[program:zookeeper-3]
command=/data/kafka-cluster-test/kafka/bin/zookeeper-server-start.sh /data/kafka-cluster-test/kafka/config/zookeeper-3.properties
user=root
stdout_logfile=/data/kafka-cluster-test/super-zookeeper-3.log
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
stopasgroup=true
priority=900
[program:kafka-1]
command=/data/kafka-cluster-test/kafka/bin/kafka-server-start.sh /data/kafka-cluster-test/kafka/config/server-1.properties
user=root
stdout_logfile=/data/kafka-cluster-test/super-kafka-1.log
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
stopasgroup=true
priority=910
[program:kafka-2]
command=/data/kafka-cluster-test/kafka/bin/kafka-server-start.sh /data/kafka-cluster-test/kafka/config/server-2.properties
user=root
stdout_logfile=/data/kafka-cluster-test/super-kafka-2.log
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
stopasgroup=true
priority=910
[program:kafka-3]
command=/data/kafka-cluster-test/kafka/bin/kafka-server-start.sh /data/kafka-cluster-test/kafka/config/server-3.properties
user=root
stdout_logfile=/data/kafka-cluster-test/super-kafka-3.log
redirect_stderr=true
autostart=true
autorestart=true
startsecs=10
killasgroup=true
stopasgroup=true
priority=910
[group:zookeeper]
programs=zookeeper-1,zookeeper-2,zookeeper-3
priority=900
[group:kafka]
programs=kafka-1,kafka-2,kafka-3
priority=910
supervisord -c /data/kafka-cluster-test/kafka-cluster-supervisord.conf
- 使用 supervisorctl -c /data/kafka-cluster-test/kafka-cluster-supervisord.conf 进行查看管理
Producer({
'client.id': 'kafka-cluster-test',
"enable.auto.commit": True,
# 以下为需要新增或者调整的参数
# FIXME: 指定所有的 kafka broker 实例地址
"bootstrap.servers": '10.0.81.9:29091,10.0.81.9:29092,10.0.81.9:29093',
"security.protocol": "SSL",
# FIXME 注意文件存放路径
"ssl.ca.location": "ssl/ca-cert",
# FIXME 注意文件存放路径
"ssl.certificate.location": "ssl/kafka.client.pem",
# FIXME 注意文件存放路径
"ssl.key.location": "ssl/kafka.client.key",
# FIXME 第 2 步中使用 sh 脚本中的 PASS 值
"ssl.key.password": "Kafka.Cluster@2018"
})
Consumer({
'group.id': 'kafka-cluster-test',
'client.id': 'kafka-cluster-test',
"enable.auto.commit": True,
# 以下为需要新增或者调整的参数
# FIXME: 指定所有的 kafka broker 实例地址
"bootstrap.servers": '10.0.81.9:29091,10.0.81.9:29092,10.0.81.9:29093',
"security.protocol": "SSL",
# FIXME 注意文件存放路径
"ssl.ca.location": "ssl/ca-cert",
# FIXME 注意文件存放路径
"ssl.certificate.location": "ssl/kafka.client.pem",
# FIXME 注意文件存放路径
"ssl.key.location": "ssl/kafka.client.key",
# FIXME 第 2 步中使用 sh 脚本中的 PASS 值
"ssl.key.password": "Kafka.Cluster@2018"
})
redef Kafka::kafka_conf = table(
["metadata.broker.list"] = "10.0.81.9:29091,10.0.81.9:29092,10.0.81.9:29093",
["client.id"] = "smaug.raw",
# 以下为需要新增或者调整的参数
# FIXME: 指定所有的 kafka broker 实例地址
["security.protocol"] = "SSL",
# FIXME 注意文件存放路径
["ssl.ca.location"] = "/data/kafka-cluster-test/ssl/kafka-client-ssl/ca-cert",
# FIXME 注意文件存放路径
["ssl.certificate.location"] = "/data/kafka-cluster-test/ssl/kafka-client-ssl/kafka.client.pem",
# FIXME 注意文件存放路径
["ssl.key.location"] = "/data/kafka-cluster-test/ssl/kafka-client-ssl/kafka.client.key",
# FIXME 第 2 步中使用 sh 脚本中的 PASS 值
["ssl.key.password"] = "Kafka.Cluster@2018"
);
kafka {
codec => plain {
format => "%{message}"
}
topic_id => "mytopic"
# 以下为需要新增或者调整的参数
# FIXME: 指定所有的 kafka broker 实例地址
bootstrap_servers => "10.0.81.9:29091,10.0.81.9:29092,10.0.81.9:29093"
client_id => "smaug.raw"
security_protocol => "SSL"
# FIXME 第 2 步中使用 sh 脚本中的 PASS 值
ssl_key_password => "Kafka.Cluster@2018"
# FIXME 注意文件存放路径
ssl_keystore_location => "/data/kafka-cluster-test/ssl/kafka.client.keystore.jks"
# FIXME 第 2 步中使用 sh 脚本中的 PASS 值
ssl_keystore_password => "Kafka.Cluster@2018"
ssl_keystore_type => "JKS"
# FIXME 注意文件存放路径
ssl_truststore_location => "/data/kafka-cluster-test/ssl/kafka.client.truststore.jks"
# FIXME 第 2 步中使用 sh 脚本中的 PASS 值
ssl_truststore_password => "Kafka.Cluster@2018"
ssl_truststore_type => "JKS"
}