Skip to content

Instantly share code, notes, and snippets.

@xdralex
Created January 5, 2017 00:14
Show Gist options
  • Save xdralex/845bcf8f06ab0cfcf9785d9f95450b88 to your computer and use it in GitHub Desktop.
Save xdralex/845bcf8f06ab0cfcf9785d9f95450b88 to your computer and use it in GitHub Desktop.
package kstreams_demo
import java.lang.{Integer => JInt}
import java.util.Properties
import kstreams_demo.converters._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream._
import org.apache.kafka.streams.kstream.internals.TimeWindow
import org.apache.kafka.streams.processor.TimestampExtractor
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.joda.time.{DateTime, DateTimeZone}
import scala.collection.JavaConverters._
/*
[Environment]
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
[Setup]
bin/kafka-topics.sh --create --topic TimeInputTopic --zookeeper localhost:2181/kafka --partitions 1 --replication-factor 1
[Run]
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TimeInputTopic --property parse.key=true --property key.separator=,
*/
object TimeDemo
{
def parse(s: String): (DateTime, JInt) = {
s.split(" ").filter(_.nonEmpty).toSeq match {
case Seq(a, b) => (DateTime.parse(a), b.toInt)
}
}
class DemoTimestampExtractor extends TimestampExtractor
{
override def extract(record: ConsumerRecord[AnyRef, AnyRef]): Long = {
parse(record.value().asInstanceOf[String])._1.getMillis
}
}
def main(args: Array[String]): Unit = {
val builder = new KStreamBuilder
val config = new Properties()
config.putAll(Map(
StreamsConfig.APPLICATION_ID_CONFIG -> "time-demo10",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
StreamsConfig.ZOOKEEPER_CONNECT_CONFIG -> "localhost:2181/kafka",
StreamsConfig.KEY_SERDE_CLASS_CONFIG -> Serdes.String.getClass.getName,
StreamsConfig.VALUE_SERDE_CLASS_CONFIG -> Serdes.String.getClass.getName,
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG -> classOf[DemoTimestampExtractor],
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest"
).asJava)
val input: KStream[String, String] = builder.stream(Serdes.String(), Serdes.String(), "TimeInputTopic")
input.foreach((k: String, v: String) => println(s"Input: $k - $v"))
val window: Windows[TimeWindow] = TimeWindows.of(60000).advanceBy(60000).until(30000)
val aggregated: KTable[Windowed[String], JInt] = input
.mapValues((v: String) => parse(v)._2)
.groupByKey(Serdes.String(), Serdes.Integer())
.reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window, "TimeStore1")
aggregated.foreach {
(w: Windowed[String], s: JInt) =>
val start = new DateTime(w.window().start(), DateTimeZone.UTC)
val end = new DateTime(w.window().end(), DateTimeZone.UTC)
println(s"Aggregated: $start..$end - ${w.key()} - $s")
}
val streams = new KafkaStreams(builder, config)
streams.start()
Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
override def run(): Unit = streams.close()
}))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment