Last active
June 14, 2020 06:01
-
-
Save roc26002w/c4329ae60468f9d619085bfcc34d6f39 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## install | |
[說明網址](https://kafka.apache.org/quickstart) | |
[下載網址](https://kafka.apache.org/downloads) | |
* 安裝步驟 | |
* 下載 | |
* wget http://ftp.mirror.tw/pub/apache/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz | |
* tar -xzf kafka_2.11-0.10.2.0.tgz | |
* cd kafka_2.11-0.10.2.0.tgz | |
* 啟動 Server | |
* zookeeper | |
* bin/zookeeper-server-start.sh config/zookeeper.properties | |
* kafka-server | |
* bin/kafka-server-start.sh config/server.properties | |
* 建立 Topic (主題) | |
* 建立名為 test 的主題 | |
* bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test | |
* 查詢建立的 Topic (主題) | |
* bin/kafka-topics.sh --list --zookeeper localhost:2181 | |
* 發送訊息 | |
* bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test | |
* 開始發送 `Ctrl + C ` 離開 | |
* Start a consumer | |
* bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning | |
* 可以觀查到剛剛發送的訊息 | |
* 設定叢集 Setting up a multi-broker cluster (2組) | |
* cp config/server.properties config/server-1.properties | |
* cp config/server.properties config/server-2.properties | |
* EDIT 剛剛複製的 properties | |
``` | |
config/server-1.properties: | |
broker.id=1 | |
listeners=PLAINTEXT://:9093 | |
log.dir=/tmp/kafka-logs-1 | |
``` | |
``` | |
config/server-2.properties: | |
broker.id=2 | |
listeners=PLAINTEXT://:9094 | |
log.dir=/tmp/kafka-logs-2 | |
``` | |
* 啟動剛剛建立的 Server | |
* bin/kafka-server-start.sh config/server-1.properties & | |
* bin/kafka-server-start.sh config/server-2.properties & | |
* 建立新的 Topic | |
* bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic | |
* 查詢 Topic 的資訊 | |
* bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic | |
* 對剛剛建立的發送訊息 | |
* bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic | |
* 開始發送 `Ctrl + C ` 離開 | |
* start consumer | |
* bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic | |
* 可以觀查到剛剛發送的訊息 | |
* 查詢在 run 的 processor | |
* ps aux | grep server-1.properties | |
* 刪除其中一個 server | |
* kill -9 7564 | |
* 可觀查 Leadership has switched | |
* se Kafka Connect to import/export data | |
* echo -e "foo\nbar" > test.txt | |
* bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties | |
* cat test.sink.txt | |
* bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning | |
* echo "Another line" >> test.txt | |
## Use Kafka Streams to process data | |
``` | |
// Serializers/deserializers (serde) for String and Long types | |
final Serde<String> stringSerde = Serdes.String(); | |
final Serde<Long> longSerde = Serdes.Long(); | |
// Construct a `KStream` from the input topic ""streams-file-input", where message values | |
// represent lines of text (for the sake of this example, we ignore whatever may be stored | |
// in the message keys). | |
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input"); | |
KTable<String, Long> wordCounts = textLines | |
// Split each text line, by whitespace, into words. | |
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) | |
// Group the text words as message keys | |
.groupBy((key, value) -> value) | |
// Count the occurrences of each word (message key). | |
.count("Counts") | |
// Store the running counts as a changelog stream to the output topic. | |
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output"); | |
``` | |
* echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt | |
* ``` | |
bin/kafka-topics.sh --create \ | |
--zookeeper localhost:2181 \ | |
--replication-factor 1 \ | |
--partitions 1 \ | |
--topic streams-file-input | |
``` | |
* bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt | |
* ``` | |
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ | |
--topic streams-wordcount-output \ | |
--from-beginning \ | |
--formatter kafka.tools.DefaultMessageFormatter \ | |
--property print.key=true \ | |
--property print.value=true \ | |
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ | |
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer | |
``` | |
* 查詢 port 佔用 | |
``` | |
查看 Linux TCP Port 被哪隻程式(Process)佔用, 可以用下述的命令: | |
sudo lsof -i | |
sudo lsof -i | grep TCP | |
sudo lsof -i :80 | grep LISTEN | |
sudo netstat -lptu | |
sudo netstat -tulpn | |
sudo ls -l /proc/$pid/exe | |
``` | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment