Skip to content

Instantly share code, notes, and snippets.

@timvw
Last active September 4, 2019 20:18
Show Gist options
  • Save timvw/6319fb9ac96c210d2f155c6fac02acbc to your computer and use it in GitHub Desktop.
Save timvw/6319fb9ac96c210d2f155c6fac02acbc to your computer and use it in GitHub Desktop.
Scala friendlier API for Kafka Streams
val builder = new KStreamBuilder()
val twitterStream: KStream[String, String] = builder.stream[String, String]("twitter")
import be.icteam.demo.{KafkaStreamWrapper => KSW}
val pipeline =
KSW.filter { (k: String, v: String) => v.contains("apache")} _ andThen
KSW.filter((k, v) => true) andThen
KSW.foreach((k, v) => println(v))
pipeline(twitterStream)
val builder = new KStreamBuilder()
val twitterStream: KStream[String, String] = builder.stream[String, String]("twitter")
import KafkaStreamWrapper._
twitterStream
.filter((k: String, v: String) => v.contains("apache"))
.filter((k: String, v: String) => true)
.foreach((k: String, v: String) => println(v))
package be.icteam.demo
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream._
import scala.collection.JavaConverters._
object KafkaStreamWrapper {
private def buildPredicate[K, V](tester: (K, V) => Boolean) : Predicate[K, V] = new Predicate[K, V] {
override def test(key: K, value: V): Boolean = tester(key, value)
}
private def buildForeachAction[K, V](fn: (K, V) => Unit): ForeachAction[K, V] = new ForeachAction[K, V] {
override def apply(key: K, value: V): Unit = fn(key, value)
}
private def buildValueMapper[V1, V2](mapper: V1 => V2) : ValueMapper[V1, V2] = new ValueMapper[V1, V2] {
override def apply(value: V1): V2 = mapper(value)
}
private def buildKeyValueMapper[K, V, R](mapper: (K, V) => R) : KeyValueMapper[K, V, R] = new KeyValueMapper[K, V, R] {
override def apply(key: K, value: V): R = mapper(key, value)
}
private def buildKeyValue[K, V](key: K, value: V) : KeyValue[K, V] = new KeyValue[K, V](key, value)
private def buildKeyValue[K, V](tuple: (K, V)) : KeyValue[K, V] = new KeyValue[K, V](tuple._1, tuple._2)
def filter[K, V](tester: (K, V) => Boolean)(s: KStream[K, V]) : KStream[K, V] = s.filter(buildPredicate(tester))
def filterNot[K, V](tester: (K, V) => Boolean)(s: KStream[K, V]) : KStream[K, V] = s.filterNot(buildPredicate(tester))
def selectKey[K, V, R](mapper: (K, V) => R)(s: KStream[K, V]) : KStream[R, V] = s.selectKey(buildKeyValueMapper(mapper))
def map[K1, V1, K2, V2](mapper: (K1, V1) => (K2, V2))(s: KStream[K1, V1]) : KStream[K2, V2] = s.map(buildKeyValueMapper((key, value) => buildKeyValue(mapper(key, value))))
def mapValues[K, V1, V2](mapper: V1 => V2)(s: KStream[K, V1]) : KStream[K, V2] = s.mapValues(buildValueMapper(mapper))
def flatMap[K1, V1, K2, V2](mapper: (K1, V1) => scala.collection.Iterable[(K2, V2)])(s: KStream[K1, V1]) : KStream[K2, V2] =
s.flatMap(buildKeyValueMapper((k: K1, v: V1) => mapper(k, v).map(mm => buildKeyValue(mm._1, mm._2)).asJava))
def flatMapValues[K, V1, V2](mapper: V1 => scala.collection.Iterable[V2])(s: KStream[K, V1]) : KStream[K, V2] =
s.flatMapValues(buildValueMapper((v: V1) => mapper(v).asJava))
def branch[K, V](predicates: ((K, V) => Boolean)*)(s: KStream[K, V]) : Array[KStream[K, V]] = {
val zzz: Seq[Predicate[K, V]] = predicates.map(x => buildPredicate(x))
s.branch(zzz: _*)
}
def through[K, V](topic: String)(s: KStream[K, V]) : KStream[K, V] = s.through(topic)
def foreach[K, V](fn: (K, V) => Unit)(s: KStream[K, V]) : Unit = s.foreach(buildForeachAction(fn))
implicit class Wrapper[K, V](s: KStream[K, V]) {
def filter(tester: (K, V) => Boolean) : KStream[K, V] = KafkaStreamWrapper.filter(tester)(s)
def filterNot(tester: (K, V) => Boolean): KStream[K, V] = KafkaStreamWrapper.filterNot(tester)(s)
def selectKey[R](mapper: (K, V) => R): KStream[R, V] = KafkaStreamWrapper.selectKey(mapper)(s)
def map[K2, V2](mapper: (K, V) => (K2, V2)): KStream[K2, V2] = KafkaStreamWrapper.map(mapper)(s)
def mapValues[V2](mapper: V => V2): KStream[K, V2] = KafkaStreamWrapper.mapValues(mapper)(s)
def flatMap[K2, V2](mapper: (K, V) => scala.collection.Iterable[(K2, V2)]): KStream[K2, V2] = KafkaStreamWrapper.flatMap(mapper)(s)
def flatMapValues[V2](mapper: V => scala.collection.Iterable[V2]): KStream[K, V2] = KafkaStreamWrapper.flatMapValues(mapper)(s)
def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] = KafkaStreamWrapper.branch(predicates :_*)(s)
def through(topic: String): KStream[K, V] = KafkaStreamWrapper.through(topic)(s)
def foreach(fn: (K, V) => Unit): Unit = KafkaStreamWrapper.foreach(fn)(s)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment