Skip to content

Instantly share code, notes, and snippets.

@j-tim
Created April 1, 2020 13:38
Show Gist options
  • Save j-tim/46a0d3d23d14d39e218d02d9950f8463 to your computer and use it in GitHub Desktop.
Save j-tim/46a0d3d23d14d39e218d02d9950f8463 to your computer and use it in GitHub Desktop.
Spring Kafka Configuration to demonstrate how to configure the ErrorHandlingDeserializer2
package io.stockgeeks.kafka.stock.tick.consumer.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.LoggingErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Map;
/**
* Spring Kafka configuration to demonstrate how to configure the ErrorHandlingDeserializer2.
*/
@Configuration
public class KafkaConfiguration {
private final KafkaProperties kafkaProperties;
public KafkaConfiguration(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
Map<String, Object> configProperties = kafkaProperties.buildConsumerProperties();
// This is the important part! To configure the ErrorHandlingDeserializer2
// and key and value deserializer delegate classes
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
configProperties.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
configProperties.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
ConsumerFactory<Object, Object> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(configProperties);
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
// This is the default but just to explain where you can configure the error handler
factory.setErrorHandler(new LoggingErrorHandler());
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment