Skip to content

Instantly share code, notes, and snippets.

@JorgenRingen
Last active October 2, 2020 10:33
Show Gist options
  • Save JorgenRingen/776fff1c9c614b44b5751b633db5bc16 to your computer and use it in GitHub Desktop.
Save JorgenRingen/776fff1c9c614b44b5751b633db5bc16 to your computer and use it in GitHub Desktop.
Kafka joining streams/tables with windowing
package org.example.profiles
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.Consumed
import org.apache.kafka.streams.kstream.JoinWindows
import org.apache.kafka.streams.kstream.ValueJoiner
import org.slf4j.LoggerFactory
import java.lang.invoke.MethodHandles
import java.time.Duration
import java.util.*
/**
* kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic entity.foo.key
* kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic entity.bar.key
*
*
* Writing:
* kafka-console-producer --topic entity.foo.key --broker-list localhost:9092 --property "parse.key=true" --property "key.separator=:"
* >1:hello1
*/
const val fooTopic = "entity.foo.key"
const val barTopic = "entity.bar.key"
val logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass())
fun main() {
val properties = Properties()
with(properties) {
put(StreamsConfig.APPLICATION_ID_CONFIG, "joining-example-" + System.currentTimeMillis())
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().javaClass)
put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().javaClass)
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams/" + System.currentTimeMillis())
}
val streamsBuilder = StreamsBuilder()
val fooStream = streamsBuilder.stream(fooTopic, Consumed.with(Serdes.String(), Serdes.String()))
val barStream = streamsBuilder.stream(barTopic, Consumed.with(Serdes.String(), Serdes.String()))
// JOIN STREAMS
fooStream
.peek { key, value -> logger.info("Received foo, waiting 10 seconds for bar") }
.outerJoin(barStream, { value1, value2 -> value1 to value2 }, JoinWindows.of(Duration.ofSeconds(10)))
.peek { key, value ->
if (value.second == null) {
logger.warn("Bar was null when foo arrived, waiting 10 seconds....")
}
}
.filter { key, value -> value.first != null && value.second != null }
.peek { key, value -> logger.info("Joined successfully $value") }
// JOIN TABLES
// val fooTable = streamsBuilder.stream(fooTopic, Consumed.with(Serdes.String(), Serdes.String()))
// val barTable = streamsBuilder.stream(barTopic, Consumed.with(Serdes.String(), Serdes.String()))
//
// fooTable
// .peek { key, value -> logger.info("Received foo, waiting 10 seconds for bar") }
// .outerJoin(barTable, { value1, value2 -> value1 to value2 }, JoinWindows.of(Duration.ofSeconds(10)))
// .peek { key, value ->
// if (value.second == null) {
// logger.warn("Bar was null when foo arrived, waiting 10 seconds....")
// }
// }
// .filter { key, value -> value.first != null && value.second != null }
// .peek { key, value -> logger.info("Joined successfully $value") }
val kafkaStreams = KafkaStreams(streamsBuilder.build(), properties)
kafkaStreams.start()
Runtime.getRuntime().addShutdownHook(Thread(Runnable {
kafkaStreams.close()
}))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment