Skip to content

Instantly share code, notes, and snippets.

@bastman
Last active September 3, 2018 10:49
Show Gist options
  • Save bastman/2bc9e5998532403c877a9e1b609eb594 to your computer and use it in GitHub Desktop.
Save bastman/2bc9e5998532403c877a9e1b609eb594 to your computer and use it in GitHub Desktop.
spring_kafka_without_annotations
@Bean
fun container(
): KafkaMessageListenerContainer<String, String> {
val listener = object : AcknowledgingMessageListener<String, String> {
override fun onMessage(record: ConsumerRecord<String, String>, acknowledgment: Acknowledgment) {
logger.info { "Received: ${record.value()}" }
val event: NewMailEvent = deserializeEvent(eventJson = record.value()) ?: return
try {
newMailEventHandler.handle(newMailEvent = event)
} catch (all: Exception) {
logger.error { "Failed to handle event: $event" }
} finally {
acknowledgment.acknowledge()
}
}
}
val containerProperties = ContainerProperties("mytopic")
containerProperties.messageListener = listener
// containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE)
println(containerProperties.ackMode)
val consumerFactory = DefaultKafkaConsumerFactory<String, String>(
consumerConfigs(), StringDeserializer(), StringDeserializer()
)
.also { logger.info { "Create Consumer Factory. isAutoCommit=${it.isAutoCommit}" } }
val container: KafkaMessageListenerContainer<String, String> =
KafkaMessageListenerContainer(consumerFactory, containerProperties)
container.isAutoStartup = autoStartEnabled
return container
}
// see: http://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/
// see: https://github.com/SpringOnePlatform2016/grussell-spring-kafka
// https://www.linkedin.com/pulse/few-experiments-spring-kafka-vishal-mahuli/
// https://www.linkedin.com/pulse/flow-control-kafka-queueconsumer-pausing-vishal-mahuli/
// https://github.com/egovernments/egov-services/tree/master/core/egov-indexer/src/main/java/org/egov/infra/indexer/consumer
@PostConstruct
fun fromScratch() {
val kafkaConsumerFactory = DefaultKafkaConsumerFactory(
consumerConfigs(),
StringDeserializer(),
StringDeserializer())
val containerProperties = ContainerProperties("my-topic")
containerProperties.messageListener = MessageListener<String, String> { record ->
logger.info { "on Message: $record" }
}
// blend container with full consumer configuration and you get...
// MessageListenerContainer!
val container = ConcurrentMessageListenerContainer<String, String>(
kafkaConsumerFactory,
containerProperties
)
//container.set
container.start()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment