Skip to content

Instantly share code, notes, and snippets.

@miguno
Last active November 29, 2017 20:10
Show Gist options
  • Save miguno/e5b39328e944748c653c2f3706be49fd to your computer and use it in GitHub Desktop.
Save miguno/e5b39328e944748c653c2f3706be49fd to your computer and use it in GitHub Desktop.
WordCount algorithm implemented via a standard Scala application that uses Kafka's Streams API (Kafka 0.10.2)
import java.lang.Long
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
import scala.collection.JavaConverters.asJavaIterableConverter
// This Scala application is elastic, scalable, distributed (if need be), and fault-tolerant.
// The code below can be used in production to count millions of words per second.
// How it works: http://docs.confluent.io/current/streams/developer-guide.html
// More examples and demo applications: https://github.com/confluentinc/examples
object WordCountApplication {
def main(args: Array[String]) {
val streamsConfiguration: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-scala")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
p.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p
}
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[Long] = Serdes.Long()
val builder: KStreamBuilder = new KStreamBuilder()
val textLines: KStream[String, String] = builder.stream("input-topic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("word-counts")
wordCounts.to(stringSerde, longSerde, "output-topic")
val streams: KafkaStreams = new KafkaStreams(builder, streamsConfiguration)
streams.start()
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS)
}))
}
}
@Rtowne-Janrain
Copy link

Thank you for this gist! 👍 It solved a problem I was having converting the Java example to Scala :)

Awesome

@ardlema
Copy link

ardlema commented Aug 1, 2017

Hi @miguno, I can't make this work. I'm running the main method, producing messages (using the kafka-console-producer) and getting nothing when consuming the messages (using the kafka-console-consumer). If I change the code that counts the words for something simpler like:

val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase())
uppercasedWithMapValues.to("output-topic")

works fine. Would it be possible to have a problem with the deserialization of the java.lang.Long values of the output topic using the standard console consumer? See below how I'm producing and consuming the messages:

./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092
./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning

Thank you in advance!

@ardlema
Copy link

ardlema commented Aug 16, 2017

I've solved the previous issue (in my unit tests) by setting the cache.max.bytes.buffering to 0. See the following stackoverflow question

@felipegutierrez
Copy link

felipegutierrez commented Nov 29, 2017

Hi @ardlema or @miguno,

How did you do to extract the lines from textLines? I am getting the error below:

[error] /home/felipe/workspace-scala-eclipse/scala-akka-stream-kafka/src/main/scala/com/kafka/streams/WordCountStream.scala:32:78: missing parameter type for expanded function ((x$1) => x$1.toUpperCase())
[error]   val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase())

when I do both of the codes:

  val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues((x) => x.toUpperCase())
  uppercasedWithMapValues.to("streams-wordcount-output")

or ......

  val wordCounts: KTable[String, Long] = textLines
        .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
        .groupBy((_, word) => word)
        .count("word-counts")

I saw that there is a problem because Kafka is written in Java e we are using Scala.
How did you put your code to work or did you solve in another way?
Thanks, Felipe

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment