Last active
September 4, 2019 20:18
-
-
Save timvw/6319fb9ac96c210d2f155c6fac02acbc to your computer and use it in GitHub Desktop.
Scala friendlier API for Kafka Streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val builder = new KStreamBuilder() | |
val twitterStream: KStream[String, String] = builder.stream[String, String]("twitter") | |
import be.icteam.demo.{KafkaStreamWrapper => KSW} | |
val pipeline = | |
KSW.filter { (k: String, v: String) => v.contains("apache")} _ andThen | |
KSW.filter((k, v) => true) andThen | |
KSW.foreach((k, v) => println(v)) | |
pipeline(twitterStream) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val builder = new KStreamBuilder() | |
val twitterStream: KStream[String, String] = builder.stream[String, String]("twitter") | |
import KafkaStreamWrapper._ | |
twitterStream | |
.filter((k: String, v: String) => v.contains("apache")) | |
.filter((k: String, v: String) => true) | |
.foreach((k: String, v: String) => println(v)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package be.icteam.demo | |
import org.apache.kafka.streams.KeyValue | |
import org.apache.kafka.streams.kstream._ | |
import scala.collection.JavaConverters._ | |
object KafkaStreamWrapper { | |
private def buildPredicate[K, V](tester: (K, V) => Boolean) : Predicate[K, V] = new Predicate[K, V] { | |
override def test(key: K, value: V): Boolean = tester(key, value) | |
} | |
private def buildForeachAction[K, V](fn: (K, V) => Unit): ForeachAction[K, V] = new ForeachAction[K, V] { | |
override def apply(key: K, value: V): Unit = fn(key, value) | |
} | |
private def buildValueMapper[V1, V2](mapper: V1 => V2) : ValueMapper[V1, V2] = new ValueMapper[V1, V2] { | |
override def apply(value: V1): V2 = mapper(value) | |
} | |
private def buildKeyValueMapper[K, V, R](mapper: (K, V) => R) : KeyValueMapper[K, V, R] = new KeyValueMapper[K, V, R] { | |
override def apply(key: K, value: V): R = mapper(key, value) | |
} | |
private def buildKeyValue[K, V](key: K, value: V) : KeyValue[K, V] = new KeyValue[K, V](key, value) | |
private def buildKeyValue[K, V](tuple: (K, V)) : KeyValue[K, V] = new KeyValue[K, V](tuple._1, tuple._2) | |
def filter[K, V](tester: (K, V) => Boolean)(s: KStream[K, V]) : KStream[K, V] = s.filter(buildPredicate(tester)) | |
def filterNot[K, V](tester: (K, V) => Boolean)(s: KStream[K, V]) : KStream[K, V] = s.filterNot(buildPredicate(tester)) | |
def selectKey[K, V, R](mapper: (K, V) => R)(s: KStream[K, V]) : KStream[R, V] = s.selectKey(buildKeyValueMapper(mapper)) | |
def map[K1, V1, K2, V2](mapper: (K1, V1) => (K2, V2))(s: KStream[K1, V1]) : KStream[K2, V2] = s.map(buildKeyValueMapper((key, value) => buildKeyValue(mapper(key, value)))) | |
def mapValues[K, V1, V2](mapper: V1 => V2)(s: KStream[K, V1]) : KStream[K, V2] = s.mapValues(buildValueMapper(mapper)) | |
def flatMap[K1, V1, K2, V2](mapper: (K1, V1) => scala.collection.Iterable[(K2, V2)])(s: KStream[K1, V1]) : KStream[K2, V2] = | |
s.flatMap(buildKeyValueMapper((k: K1, v: V1) => mapper(k, v).map(mm => buildKeyValue(mm._1, mm._2)).asJava)) | |
def flatMapValues[K, V1, V2](mapper: V1 => scala.collection.Iterable[V2])(s: KStream[K, V1]) : KStream[K, V2] = | |
s.flatMapValues(buildValueMapper((v: V1) => mapper(v).asJava)) | |
def branch[K, V](predicates: ((K, V) => Boolean)*)(s: KStream[K, V]) : Array[KStream[K, V]] = { | |
val zzz: Seq[Predicate[K, V]] = predicates.map(x => buildPredicate(x)) | |
s.branch(zzz: _*) | |
} | |
def through[K, V](topic: String)(s: KStream[K, V]) : KStream[K, V] = s.through(topic) | |
def foreach[K, V](fn: (K, V) => Unit)(s: KStream[K, V]) : Unit = s.foreach(buildForeachAction(fn)) | |
implicit class Wrapper[K, V](s: KStream[K, V]) { | |
def filter(tester: (K, V) => Boolean) : KStream[K, V] = KafkaStreamWrapper.filter(tester)(s) | |
def filterNot(tester: (K, V) => Boolean): KStream[K, V] = KafkaStreamWrapper.filterNot(tester)(s) | |
def selectKey[R](mapper: (K, V) => R): KStream[R, V] = KafkaStreamWrapper.selectKey(mapper)(s) | |
def map[K2, V2](mapper: (K, V) => (K2, V2)): KStream[K2, V2] = KafkaStreamWrapper.map(mapper)(s) | |
def mapValues[V2](mapper: V => V2): KStream[K, V2] = KafkaStreamWrapper.mapValues(mapper)(s) | |
def flatMap[K2, V2](mapper: (K, V) => scala.collection.Iterable[(K2, V2)]): KStream[K2, V2] = KafkaStreamWrapper.flatMap(mapper)(s) | |
def flatMapValues[V2](mapper: V => scala.collection.Iterable[V2]): KStream[K, V2] = KafkaStreamWrapper.flatMapValues(mapper)(s) | |
def branch(predicates: ((K, V) => Boolean)*): Array[KStream[K, V]] = KafkaStreamWrapper.branch(predicates :_*)(s) | |
def through(topic: String): KStream[K, V] = KafkaStreamWrapper.through(topic)(s) | |
def foreach(fn: (K, V) => Unit): Unit = KafkaStreamWrapper.foreach(fn)(s) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment