-
-
Save miguno/e5b39328e944748c653c2f3706be49fd to your computer and use it in GitHub Desktop.
| import java.lang.Long | |
| import java.util.Properties | |
| import java.util.concurrent.TimeUnit | |
| import org.apache.kafka.common.serialization._ | |
| import org.apache.kafka.streams._ | |
| import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable} | |
| import scala.collection.JavaConverters.asJavaIterableConverter | |
| // This Scala application is elastic, scalable, distributed (if need be), and fault-tolerant. | |
| // The code below can be used in production to count millions of words per second. | |
| // How it works: http://docs.confluent.io/current/streams/developer-guide.html | |
| // More examples and demo applications: https://github.com/confluentinc/examples | |
| object WordCountApplication { | |
| def main(args: Array[String]) { | |
| val streamsConfiguration: Properties = { | |
| val p = new Properties() | |
| p.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-scala") | |
| p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092") | |
| p.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) | |
| p.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) | |
| p | |
| } | |
| val stringSerde: Serde[String] = Serdes.String() | |
| val longSerde: Serde[Long] = Serdes.Long() | |
| val builder: KStreamBuilder = new KStreamBuilder() | |
| val textLines: KStream[String, String] = builder.stream("input-topic") | |
| val wordCounts: KTable[String, Long] = textLines | |
| .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava) | |
| .groupBy((_, word) => word) | |
| .count("word-counts") | |
| wordCounts.to(stringSerde, longSerde, "output-topic") | |
| val streams: KafkaStreams = new KafkaStreams(builder, streamsConfiguration) | |
| streams.start() | |
| Runtime.getRuntime.addShutdownHook(new Thread(() => { | |
| streams.close(10, TimeUnit.SECONDS) | |
| })) | |
| } | |
| } |
Hi @miguno, I can't make this work. I'm running the main method, producing messages (using the kafka-console-producer) and getting nothing when consuming the messages (using the kafka-console-consumer). If I change the code that counts the words for something simpler like:
val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase())
uppercasedWithMapValues.to("output-topic")
works fine. Would it be possible to have a problem with the deserialization of the java.lang.Long values of the output topic using the standard console consumer? See below how I'm producing and consuming the messages:
./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092
./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning
Thank you in advance!
I've solved the previous issue (in my unit tests) by setting the cache.max.bytes.buffering to 0. See the following stackoverflow question
How did you do to extract the lines from textLines? I am getting the error below:
[error] /home/felipe/workspace-scala-eclipse/scala-akka-stream-kafka/src/main/scala/com/kafka/streams/WordCountStream.scala:32:78: missing parameter type for expanded function ((x$1) => x$1.toUpperCase())
[error] val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase())
when I do both of the codes:
val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues((x) => x.toUpperCase())
uppercasedWithMapValues.to("streams-wordcount-output")
or ......
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("word-counts")
I saw that there is a problem because Kafka is written in Java e we are using Scala.
How did you put your code to work or did you solve in another way?
Thanks, Felipe
Thank you for this gist! 👍 It solved a problem I was having converting the Java example to Scala :)
Awesome