Skip to content

Instantly share code, notes, and snippets.

@JorgenRingen
JorgenRingen / KafkaConsumer.java
Created July 25, 2019 08:48
Consume partition with initial offset using spring-boot-kafka
@KafkaListener(topics = KafkaSpringBootHelloWorldApplication.TOPIC_USERS, groupId = "${groupId}",
topicPartitions = {
@TopicPartition(
topic = KafkaSpringBootHelloWorldApplication.TOPIC_USERS,
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
})
@JorgenRingen
JorgenRingen / Outerjoin with windowing
Last active October 2, 2020 10:33
Kafka joining streams/tables with windowing
package org.example.profiles
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.Consumed
import org.apache.kafka.streams.kstream.JoinWindows
import org.apache.kafka.streams.kstream.ValueJoiner
@JorgenRingen
JorgenRingen / KafkaConfig.kt
Created February 11, 2021 16:10
Set topic-config for internal changelog-topics in kafka streams
streamsBuilder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("storeName"),
Serdes.StringSerde(),
JsonSerde<OrderV2>()
)
.withCachingEnabled()
.withLoggingEnabled(
mapOf(
RETENTION_MS_CONFIG to Duration.ofDays(30).toMillis().toString(),