Skip to content

Instantly share code, notes, and snippets.

@roc26002w
Last active June 14, 2020 06:01
Show Gist options
  • Save roc26002w/c4329ae60468f9d619085bfcc34d6f39 to your computer and use it in GitHub Desktop.
Save roc26002w/c4329ae60468f9d619085bfcc34d6f39 to your computer and use it in GitHub Desktop.
## install
[說明網址](https://kafka.apache.org/quickstart)
[下載網址](https://kafka.apache.org/downloads)
* 安裝步驟
* 下載
* wget http://ftp.mirror.tw/pub/apache/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
* tar -xzf kafka_2.11-0.10.2.0.tgz
* cd kafka_2.11-0.10.2.0.tgz
* 啟動 Server
* zookeeper
* bin/zookeeper-server-start.sh config/zookeeper.properties
* kafka-server
* bin/kafka-server-start.sh config/server.properties
* 建立 Topic (主題)
* 建立名為 test 的主題
* bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
* 查詢建立的 Topic (主題)
* bin/kafka-topics.sh --list --zookeeper localhost:2181
* 發送訊息
* bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
* 開始發送 `Ctrl + C ` 離開
* Start a consumer
* bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
* 可以觀查到剛剛發送的訊息
* 設定叢集 Setting up a multi-broker cluster (2組)
* cp config/server.properties config/server-1.properties
* cp config/server.properties config/server-2.properties
* EDIT 剛剛複製的 properties
```
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
```
```
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
```
* 啟動剛剛建立的 Server
* bin/kafka-server-start.sh config/server-1.properties &
* bin/kafka-server-start.sh config/server-2.properties &
* 建立新的 Topic
* bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
* 查詢 Topic 的資訊
* bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
* 對剛剛建立的發送訊息
* bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
* 開始發送 `Ctrl + C ` 離開
* start consumer
* bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
* 可以觀查到剛剛發送的訊息
* 查詢在 run 的 processor
* ps aux | grep server-1.properties
* 刪除其中一個 server
* kill -9 7564
* 可觀查 Leadership has switched
* se Kafka Connect to import/export data
* echo -e "foo\nbar" > test.txt
* bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
* cat test.sink.txt
* bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
* echo "Another line" >> test.txt
## Use Kafka Streams to process data
```
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic ""streams-file-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input");
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Group the text words as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count("Counts")
// Store the running counts as a changelog stream to the output topic.
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");
```
* echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
* ```
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input
```
* bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
* ```
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
```
* 查詢 port 佔用
```
查看 Linux TCP Port 被哪隻程式(Process)佔用, 可以用下述的命令:
sudo lsof -i
sudo lsof -i | grep TCP
sudo lsof -i :80 | grep LISTEN
sudo netstat -lptu
sudo netstat -tulpn
sudo ls -l /proc/$pid/exe
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment