https://kafka.apache.org/documentation/#quickstart
$ brew install kafka
==> Summary
🍺 /usr/local/Cellar/kafka/2.7.0: 187 files, 65.4MB
==> Caveats
==> openjdk
For the system Java wrappers to find this JDK, symlink it with
sudo ln -sfn /usr/local/opt/openjdk/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk.jdk
openjdk is keg-only, which means it was not symlinked into /usr/local,
because it shadows the macOS `java` wrapper.
If you need to have openjdk first in your PATH run:
echo 'export PATH="/usr/local/opt/openjdk/bin:$PATH"' >> ~/.zshrc
For compilers to find openjdk you may need to set:
export CPPFLAGS="-I/usr/local/opt/openjdk/include"
==> zookeeper
To have launchd start zookeeper now and restart at login:
brew services start zookeeper
Or, if you don't want/need a background service you can just run:
zkServer start
==> kafka
To have launchd start kafka now and restart at login:
brew services start kafka
Or, if you don't want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
java TCP *:54472 (LISTEN)
java TCP *:2181 (LISTEN)
java TCP *:54471 (LISTEN)
java TCP *:9092 (LISTEN)
$ cd /usr/local/etc/kafka
connect-console-sink.properties
connect-console-source.properties
connect-distributed.properties
connect-file-sink.properties
connect-file-source.properties
connect-log4j.properties
connect-mirror-maker.properties
connect-standalone.properties
consumer.properties
log4j.properties
producer.properties
server.properties
tools-log4j.properties
trogdor.conf
zookeeper.properties
https://enfuse.io/a-diy-guide-to-kafka-connectors/
https://github.com/SINTEF-9012/kafka-mqtt-sink-connector
https://github.com/SINTEF-9012/kafka-mqtt-source-connector
https://github.com/johanvandevenne/kafka-connect-mqtt
Logs
$ cd /usr/local/var/lib/kafka-logs
$ cd /usr/local/var/lib/zookeeper/version-2
http://kafka.apache.org/081/documentation/#quickstart
https://supergloo.com/kafka-connect/running-kafka-connect-standalone-vs-distributed-mode-examples/
https://github.com/johanvandevenne/kafka-connect-mqtt
Pre requisites, tested
os | java | maven | emqx |
---|---|---|---|
Mac OS Big Sur | openjdk version "15.0.1" | 3.6.3 | 4.2.7 |
Build Jar connector files
$ cd /kafka-connect-mqtt/
$ mvn clean install
Copy jar files to java/kafka
folder
$ mkdir /usr/local/Cellar/openjdk/15.0.1/kafka/
$ cp -r ./target/kafka-connect-mqtt-1.1.1-package/kafka-connect-mqtt /usr/local/Cellar/openjdk/15.0.1/kafka
- Standalone
Make KAFKA broker listen to all interfaces.
$ vim server.properties
listeners = PLAINTEXT://0.0.0.0:9092
advertised.listeners = PLAINTEXT://127.0.0.1:9092
Config Source
$ vim /usr/local/etc/kafka/kafka-connect-mqtt-source.properties
name=kafka_connect_mqtt_source
connector.class=be.jovacon.kafka.connect.MQTTSourceConnector
taks.max=1
mqtt.connector.kafka.name=kafka_connect_mqtt_source
mqtt.broker=tcp://0.0.0.0:1883
mqtt.clientID=mqtt_source_client_id
mqtt.topic=test
kafka.topic=connect-mqtt-kafka
#key.converter=org.apache.kafka.connect.storage.StringConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter.schemas.enable=false
#value.converter.schemas.enable=false
#key.serializer=org.apache.kafka.common.serialization.StringSerializer
#value.serializer=org.apache.kafka.common.serialization.StringSerializer
Config Sink
$ vim /usr/local/etc/kafka/kafka-connect-mqtt-sink.properties
name=kafka_connect_mqtt_sink
connector.class=be.jovacon.kafka.connect.MQTTSinkConnector
taks.max=1
mqtt.connector.kafka.name=kafka_connect_mqtt_sink
mqtt.broker=tcp://0.0.0.0:1883
mqtt.clientID=mqtt_sink_client_id
mqtt.topic=test2
topics=connect-mqtt-kafka
#topics.regex=downstream
#key.converter=org.apache.kafka.connect.storage.StringConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter.schemas.enable=false
#value.converter.schemas.enable=false
$ vim /usr/local/etc/kafka/connect-distributed.properties
port.host=19005
plugin.path=/usr/local/etc/openjdk/15.0.1/,/usr/local/etc/kafka/plugins,/usr/local/opt/java/
$ vim /usr/local/etc/kafka/connect-standalone.properties
port.host=19005
plugin.path=/usr/local/etc/openjdk/15.0.1/,/usr/local/etc/kafka/plugins,/usr/local/opt/java/
Start standalone
$ connect-standalone /usr/local/etc/kafka/connect-standalone.properties /usr/local/etc/kafka/kafka-connect-mqtt-source.properties /usr/local/etc/kafka/kafka-connect-mqtt-sink.properties
- Distributed
Delete topics
$ kafka-topics --zookeeper localhost:2181 --topic connect-configs --delete
$ kafka-topics --zookeeper localhost:2181 --topic connect-offsets --delete
$ kafka-topics --zookeeper localhost:2181 --topic connect-status --delete
Cluster, replication=3
$ cp /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/server-1.properties
$ vim /usr/local/etc/kafka/server-1.properties
broker.id=1
listeners = PLAINTEXT://0.0.0.0:9093
advertised.listeners = PLAINTEXT://127.0.0.1:9093
log.dir=/tmp/kafka-logs-1
$ cp /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/server-2.properties
$ vim /usr/local/etc/kafka/server-2.properties
broker.id=2
listeners = PLAINTEXT://0.0.0.0:9094
advertised.listeners = PLAINTEXT://127.0.0.1:9094
log.dir=/tmp/kafka-logs-2
Create topics
$ kafka-topics --create --bootstrap-server localhost:9092 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
$ kafka-topics --create --bootstrap-server localhost:9092 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
$ kafka-topics --create --bootstrap-server localhost:9092 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
$ kafka-topics --list --zookeeper localhost:2181
Start Distributed
$ connect-distributed /usr/local/etc/kafka/connect-distributed.properties
Load Sink
curl -X POST \
http://127.0.0.1:19005/connectors \
-H 'Content-Type: application/json' \
-d '{ "name": "kafka-mqtt-sink-connector",
"config":
{
"connector.class":"be.jovacon.kafka.connect.MQTTSinkConnector",
"mqtt.topic":"test2",
"topics":"connect-mqtt-kafka",
"mqtt.clientID":"mqtt_sink_client_id",
"mqtt.broker":"tcp://127.0.0.1:1883",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":"false"
}
}'
Load Source
curl -X POST \
http://127.0.0.1:19005/connectors \
-H 'Content-Type: application/json' \
-d '{ "name": "kafka-mqtt-source-connector",
"config":
{
"connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector",
"mqtt.topic":"test",
"kafka.topic":"connect-mqtt-kafka",
"mqtt.clientID":"mqtt_source_client_id",
"mqtt.broker":"tcp://127.0.0.1:1883",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":"false"
}
}'
Check status
http://127.0.0.1:19005/connectors/kafka-mqtt-source-connector/status
List topics
$ kafka-topics --list --zookeeper localhost:2181
$ kafka-topics --list --bootstrap-server 127.0.0.1:9092
Create topic
$ kafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic connect-mqtt-kafka
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Add partitions
$ kafka-topics --zookeeper localhost:2181 --alter --topic my-topic --partitions 16
Delete topic
$ kafka-topics --zookeeper localhost:2181 --delete --topic topic
Topic details
$ kafka-topics --describe --zookeeper localhost:2181 --topic test
Topic under replication partition
$ kafka-topics --zookeeper localhost:2181/kafka-cluster --describe --under-replicated-partitions
Post to Connect Interface
$ curl -s -X POST -H "Content-Type: application/json" http://127.0.0.1:19005/connectors -d '{"name":"kafka-connect-mqtt-source","config":{"connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector","tasks.max":"1","mqtt.broker":"tcp://localhost:1883", "mqtt.topic":"test","kafka.topic":"connect-mqtt-kafka"}}'
Connectors
$ curl --header "Content-Type: application/json" http://127.0.0.1:19005/connectors
["mqtt-source-connector"]%
List connectors
http://127.0.0.1:19005/connector-plugins
Or
$ brew install jq
$ curl -s -XGET http://localhost:19005/connector-plugins|jq '.[].class'
"com.sintef.asam.MqttSinkConnector"
"com.sintef.asam.MqttSourceConnector"
"org.apache.kafka.connect.file.FileStreamSinkConnector"
"org.apache.kafka.connect.file.FileStreamSourceConnector"
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector"
"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"
"org.apache.kafka.connect.mirror.MirrorSourceConnector"
$ curl -s -o /dev/null -w %{http_code} http://localhost:19005/connectors
Shutdown connector
curl -s -X PUT http://localhost:19005/connectors/mqtt-sink-connector/pause
curl -s -X GET http://localhost:19005/connectors/mqtt-sink-connector/status
curl -s -X DELETE http://localhost:19005/connectors/mqtt-sink-connector
curl -s -X PUT http://localhost:19005/connectors/mqtt-source-connector/pause
curl -s -X GET http://localhost:19005/connectors/mqtt-source-connector/status
curl -s -X DELETE http://localhost:19005/connectors/mqtt-source-connector
Consumer
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning -property print.key=true
Producer
New message
$ kafka-console-producer --broker-list localhost:9092 --topic test < messages.txt
$ kafka-console-producer --broker-list localhost:9092 --topic connect-mqtt-kafka < ../../../kafka/message.txt
$ kafka-console-producer --broker-list localhost:9092 --topic test
>{"Hello":"World!", "topic":"test"}
>Any message'
Mosquitto client
$ brew install mosquitto
$ cd /usr/local/Cellar/mosquitto/2.0.7/bin/
$ mosquitto_pub -t 'test' -m 'helloWorld'
Performance
$ kafka-producer-perf-test --topic position-reports --throughput 10000 --record-size 300 --num-records 20000 --producer-props bootstrap.servers="localhost:9092"