Step by step guide for multi node Confluent Kafka Platform and Cassandra cluster;
It is a multi node deployment of https://github.com/ferhtaydn/sack
Assume that, we have five Ubuntu 14.04 nodes. Their IPs are as follows;
- 12.0.5.4
- 12.0.5.5
- 12.0.5.6
- 12.0.1.170
- 12.0.1.171
All the installation and configuration settings is shown below for the node 12.0.5.4 but you need to make for all nodes.
Many problems arise from the host settings and make sure that your /etc/hosts file is like below;
127.0.0.1 localhost 12.0.5.4
127.0.0.1 localhost
127.0.0.1 ip-12-0-5-4 #(ip-12-0-5-4 is hostname of your node)
::1 ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
ff02::3 ip6-allhosts
Make sure that you set owner of all the directories below with your user if you have one before you run a nohup command.
/var/lib/kafka*/var/lib/zookeeper/var/log/kafka//usr/bin/kafka*/usr/bin/zookeeper*
https://www.digitalocean.com/community/tutorials/how-to-install-java-with-apt-get-on-ubuntu-16-04
java -version
sudo apt-get update
sudo apt-get install default-jre
sudo apt-get install default-jdk
sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer
sudo update-alternatives --config java
sudo vi /etc/environment // add JAVA_HOME="/usr/lib/jvm/java-8-oracle"
source /etc/environmentecho "deb http://www.apache.org/dist/cassandra/debian 36x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list
curl https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -
sudo apt-get update
sudo apt-get install cassandra
sudo service cassandra stop
sudo rm -rf /var/lib/cassandra/data/system/*
sudo vi /etc/cassandra/cassandra.yamlChange the below properties in cassanda.yaml:
seeds: "12.0.5.4,12.0.5.5,12.0.5.6,12.0.1.170,12.0.1.171"
listen_address: 12.0.5.4
rpc_address: 12.0.5.4
endpoint_snitch: GossipingPropertyFileSnitch
auto_bootstrap: false
sudo service cassandra start
sudo nodetool status
cqlsh 12.0.5.4 9042wget -qO - http://packages.confluent.io/deb/3.0/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/3.0 stable main"
sudo apt-get update && sudo apt-get install confluent-platform-2.11Here is your final /etc/kafka/zookeeper.properties file
dataDir=/var/lib/zookeeper
clientPort=2181
tickTime=2000
initLimit=10
syncLimit=5
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=0.0.0.0:2888:3888 # you are setting 0.0.0.0 here, since we are in node 12.0.5.4
server.2=12.0.5.5:2888:3888
server.3=12.0.5.6:2888:3888
server.4=12.0.1.170:2888:3888
server.5=12.0.1.171:2888:3888
sudo echo “1” > /var/lib/zookeeper/myid # do it in node 12.0.5.4
sudo echo “2” > /var/lib/zookeeper/myid # do it in node 12.0.5.5 etc.
sudo echo “3” > /var/lib/zookeeper/myid
sudo echo “4” > /var/lib/zookeeper/myid
sudo echo “5” > /var/lib/zookeeper/myid
Run your zookeeper in each node to create cluster:
nohup /usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties > /dev/null 2>&1 &
You can check if your zookeeper is up and its mode with echo stat | nc localhost 2181 | grep Mode command.
You need to change the lines in your /etc/kafka/server.properties file;
broker.id=0
port=9092
host.name=12.0.5.4
default.replication.factor=3
delete.topic.enable=true
log.dirs=/var/lib/kafka
zookeeper.connect=12.0.5.4:2181,12.0.5.5:2181,12.0.5.6:2181,12.0.1.170:2181,12.0.1.171:2181
broker.id can be incremented for each other nodes. (node 12.0.5.5 has broker.id 1 e.g.)
You can run your kafka server in each node;
nohup /usr/bin/kafka-server-start /etc/kafka/server.properties > /dev/null 2>&1 &
You need to change the lines in your /etc/schema-registry/schema-registry.properties file;
listeners=http://12.0.5.4:8081
kafkastore.connection.url=12.0.5.4:2181,12.0.5.5:2181,12.0.5.6:2181,12.0.1.170:2181,12.0.1.171:2181
kafkastore.topic=_schemas
debug=false
And can run Schema Registry like;
nohup /usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties > /dev/null 2>&1 &
kafka-topics --create --topic mtmsgs --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:2181
You need to change the lines in your /etc/kafka/connect-distributed.properties file, if you are using Avro for serialization of items in the topics.
bootstrap.servers=12.0.5.4:9092,12.0.5.5:9092,12.0.5.6:9092,12.0.1.170:9092,12.0.1.171:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://12.0.5.4:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://12.0.5.4:8081
You can again run Kafka Connect like nohup /usr/bin/connect-distributed /etc/kafka/connect-distributed.properties > /dev/null 2>&1 & but we will use Cassandra Sink and will run Kafka Connect in its directory with its jar added to the classpath.
You can install Cassandra Sink connector like;
##Build the connectors
git clone https://github.com/datamountaineer/stream-reactor
cd stream-reactor
gradle fatJar -x test
##Build the CLI for interacting with Kafka connectors
git clone https://github.com/datamountaineer/kafka-connect-tools
cd kafka-connect-tools
gradle fatJar -x test
cd stream-reactor/kafka-connect-cassandra/build/libs
export CLASSPATH=kafka-connect-cassandra-0.2.2-3.0.1-all.jar
nohup /usr/bin/connect-distributed /etc/kafka/connect-distributed.properties > /dev/null 2>&1 &
An example Cassandra Sink properties file /etc/kafka/connect-cassandra-sink.properties is like;
name=cassandra-sink-products
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=5
topics=product-csv-avro,product-http-avro
connect.cassandra.export.route.query=INSERT INTO products SELECT * FROM product-csv-avro;INSERT INTO products SELECT * FROM product-http-avro
connect.cassandra.contact.points=12.0.5.4
connect.cassandra.port=9042
connect.cassandra.key.space=demo
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra
A file source connector properties /etc/kafka/connect-file-source.properties can be like that;
$ cat /etc/kafka/connect-file-source.properties
name=product-csv-source
connector.class=FileStreamSource
tasks.max=5
file=~/products/products.csv
topic=product-csv-raw
cd kafka-connect-tools/build/libs
java -jar kafka-connect-cli-0.7-all.jar ps
java -jar kafka-connect-cli-0.7-all.jar create cassandra-sink-products < /etc/kafka/connect-cassandra-sink.properties
java -jar kafka-connect-cli-0.7-all.jar ps
NOT: If you add a new connector to the connect, you need to do it on leader connector, if not, it returns following error:
Error: the Kafka Connect API returned: Cannot complete request because of a conflicting operation (e.g. worker rebalance) (409). However, other cluster members successfully response to the GET /connectors requests.
/usr/bin/kafka-avro-console-consumer --zookeeper 12.0.5.4:2181 --property schema.registry.url=http://12.0.5.4:8081 --topic your-topic-name --from-beginning
akka {
loglevel = "INFO"
jvm-exit-on-fatal-error = on
}
http {
host = 0.0.0.0
port = 8080
}
kafka {
producer {
bootstrap.servers = "12.0.5.4:9092,12.0.5.5:9092,12.0.5.6:9092,12.0.1.170:9092,12.0.1.171:9092"
acks = "all"
//key.serializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
//value.serializer = "io.confluent.kafka.serializers.KafkaAvroSerializer"
schema.registry.url = "http://12.0.1.171:8081"
zookeeper.connect = "12.0.1.171:2181"
}
consumer {
bootstrap.servers = "12.0.5.4:9092,12.0.5.5:9092,12.0.5.6:9092,12.0.1.170:9092,12.0.1.171:9092"
schema.registry.url = "http://12.0.1.171:8081"
zookeeper.connect = "12.0.1.171:2181"
enable.auto.commit = false
auto.offset.reset = "earliest"
max.partition.fetch.bytes = "1048576"
schedule.interval = 1000
unconfirmed.timeout = 3000
}
}