Created
April 1, 2020 13:38
-
-
Save j-tim/46a0d3d23d14d39e218d02d9950f8463 to your computer and use it in GitHub Desktop.
Spring Kafka Configuration to demonstrate how to configure the ErrorHandlingDeserializer2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.stockgeeks.kafka.stock.tick.consumer.config; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer; | |
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | |
import org.springframework.kafka.core.ConsumerFactory; | |
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | |
import org.springframework.kafka.listener.LoggingErrorHandler; | |
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2; | |
import io.confluent.kafka.serializers.KafkaAvroDeserializer; | |
import java.util.Map; | |
/** | |
* Spring Kafka configuration to demonstrate how to configure the ErrorHandlingDeserializer2. | |
*/ | |
@Configuration | |
public class KafkaConfiguration { | |
private final KafkaProperties kafkaProperties; | |
public KafkaConfiguration(KafkaProperties kafkaProperties) { | |
this.kafkaProperties = kafkaProperties; | |
} | |
@Bean | |
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer) { | |
Map<String, Object> configProperties = kafkaProperties.buildConsumerProperties(); | |
// This is the important part! To configure the ErrorHandlingDeserializer2 | |
// and key and value deserializer delegate classes | |
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class); | |
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class); | |
configProperties.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, StringDeserializer.class); | |
configProperties.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class); | |
ConsumerFactory<Object, Object> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(configProperties); | |
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); | |
// This is the default but just to explain where you can configure the error handler | |
factory.setErrorHandler(new LoggingErrorHandler()); | |
configurer.configure(factory, kafkaConsumerFactory); | |
return factory; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment