Skip to content

Instantly share code, notes, and snippets.

@bastman
Created January 25, 2019 08:30
Show Gist options
  • Save bastman/9ddc7c36a1f4d38766d4837e101fc5f6 to your computer and use it in GitHub Desktop.
Save bastman/9ddc7c36a1f4d38766d4837e101fc5f6 to your computer and use it in GitHub Desktop.
spring-kafka-consumer: seek to beginning example
class MyListener() : ConsumerSeekAware
override fun onPartitionsAssigned(
assignments: MutableMap<TopicPartition, Long>?,
callback: ConsumerSeekAware.ConsumerSeekCallback?
) {
logger.info { "on partitions assigned: ${assignments?.toMap()}" }
if (fromBeginning && !soughtToBeginning.get()) {
val assignedTopicPartitions = assignments?.map { it.key } ?: error("no assignments found")
if (callback == null) error("seek callback not found")
assignedTopicPartitions.forEach { topicPartition ->
callback.seekToBeginning(topicPartition.topic(), topicPartition.partition())
}
logger.warn { "Kafka reset partition offset to beginning ! sought to beginning" }
soughtToBeginning.set(true)
}
}
override fun registerSeekCallback(callback: ConsumerSeekAware.ConsumerSeekCallback?):Unit =
logger.info { "Kafka Seek" }
override fun onIdleContainer(
assignments: MutableMap<TopicPartition, Long>?,
callback: ConsumerSeekAware.ConsumerSeekCallback?
):Unit = logger.info { "Kafka Idle" }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment