Use KStream.transformValues
val materialized = Materialized.as[String, Long, ByteArrayKeyValueStore]("poznan-state-store")
FIXME
Use KStream.branch
and KStream.print
with different labels (even
and odd
).
- http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html
- Zadanie: Napisz aplikację, która zlicza słowa w zdaniu (value)
- KStream.groupBy (zwraca KTable)
- KTable.toStream
cat sentences.csv | ./bin/kafka-console-producer.sh \
--broker-list :9092 \
--topic GroupByApp-input \
--property parse.key=true \
--property key.separator=,
./bin/kafka-console-consumer.sh \
--bootstrap-server :9092 \
--topic GroupByApp-output \
--property print.key=true \
--value-deserializer org.apache.kafka.common.serialization.LongDeserializer
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0)
- Processor API
- Debugging
- Uruchamianie z linii poleceń
./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
./bin/kafka-consumer-groups.sh --reset-offsets --to-latest --execute --group HelloWorldApp --bootstrap-server :9092 --topic exercise1-input
./bin/kafka-console-consumer.sh \
--bootstrap-server :9092 \
--topic exercise1-output \
--property print.key=true \
--key-deserializer org.apache.kafka.common.serialization.LongDeserializer
./bin/kafka-console-producer.sh --broker-list :9092 --topic exercise1-input --property parse.key=true --property key.separator=,
- sbt assembly
sbt assembly
./bin/kafka-topics.sh --create --topic exercise1-input --zookeeper :2181 --partitions 4 --replication-factor 1
- java -jar target/scala-2.12/poznan-workshop-kafka-streams-assembly-0.1.jar
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2)
- 2 wątki po 2 partycje każdy
- java -jar target/scala-2.12/poznan-workshop-kafka-streams-assembly-0.1.jar
- 2 instancje aplikacji
- 4 wątki po 1 partycji każdy
./bin/kafka-topics.sh --alter --topic exercise1-input --partitions 8 --zookeeper :2181
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
java -jar target/scala-2.12/poznan-workshop-kafka-streams-assembly-0.1.jar krzysztof
java -jar target/scala-2.12/poznan-workshop-kafka-streams-assembly-0.1.jar kacper
current active tasks: [0_0, 0_1]
./bin/kafka-topics.sh --describe --topic exercise1-input --zookeeper :2181
./bin/kafka-topics.sh --list --zookeeper :2181
./bin/kafka-topics.sh --delete --topic exercise1-input --zookeeper :2181
./bin/kafka-topics.sh --create --topic exercise1-input --zookeeper :2181 --partitions 2 --replication-factor 1
build.sbt
:
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.1.0"
libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.25"
Uwaga na How to define Scala API for Kafka Streams as dependency in build.sbt?
import org.apache.kafka.streams.scala._
import Serdes._
import ImplicitConversions._
object HelloWorldApp extends App {
println("Hello World")
// For Scala devs only
// Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import Serdes._
import ImplicitConversions._
// Step 1. Stworzenie Topology
val builder = new StreamsBuilder
import org.apache.kafka.streams.kstream.Printed
val console = Printed
.toSysOut[String, String]
.withLabel("DEBUG")
builder
.stream[String, String]("exercise1-input")
.print(console)
val topology = builder.build
println(topology.describe)
// Step 2. Uruchomienie topology
// Stworzenie środowiska wykonawczego
val appId = getClass.getName.replace("$", "")
import org.apache.kafka.streams.KafkaStreams
val props = new java.util.Properties
import org.apache.kafka.streams.StreamsConfig
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
val ks = new KafkaStreams(topology, props)
// opcjonalne
ks.cleanUp()
// obowiązkowe
ks.start()
}
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties
./bin/kafka-console-producer.sh --broker-list :9092 --topic [topic-name]
// Na razie niepotrzebne --property parse.key=true --property key.separator=,
./bin/kafka-console-consumer.sh \
--bootstrap-server :9092 \
--topic exercise1-input
// Na razie niepotrzebne --property print.key=true
- Laptop + ładowarka do laptop
- Java Standard Edition (Java 8 <-> Java 11)
- Apache Kafka (pobrana)
- IntelliJ IDEA
- (opcjonalnie) Scala plugin
- sbt