Skip to content

Instantly share code, notes, and snippets.

@smaillns
Last active April 21, 2025 22:05
Show Gist options
  • Save smaillns/e15852ba7f7207566e4425afdcbd8457 to your computer and use it in GitHub Desktop.
Save smaillns/e15852ba7f7207566e4425afdcbd8457 to your computer and use it in GitHub Desktop.
@Configuration
@ConfigurationProperties(prefix = "app.kafka.my-consumer")
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
public class MyKafkaConfig extends CommonKafkaConfig<String, SpecificRecord> {
@Autowired
private KafkaProperties kafkaProperties;
@Value ("${app.kafka.my-consumer.topic.retry}")
private String retryTopic;
@Value ("${app.kafka.my-consumer.topic.error}")
private String dltTopic;
@Bean
public ConsumerFactory<String, SpecificRecord> specificConsumerFactory() {
return consumerResource(kafkaProperties);
}
@Bean
public ProducerFactory<String, SpecificRecord> specificProducerFactory() {
return producerResource(kafkaProperties);
}
@Bean
public KafkaTemplate<String, SpecificRecord> specificKafkaTemplate(ProducerFactory<String, SpecificRecord> specificProducerFactory) {
return new KafkaTemplate<>(specificProducerFactory);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SpecificRecord> myListenerFactory(
ConsumerFactory<String, SpecificRecord> specificConsumerFactory, KafkaTemplate<String, SpecificRecord> specificKafkaTemplate,
DeadLetterPublishingRecoverer movementDeadLetter) {
return retryKafkaListenerContainerFactory(specificConsumerFactory, retryTopic, specificKafkaTemplate, movementDeadLetter, fixedBackOffMain);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SpecificRecord> myRetryListenerFactory(
ConsumerFactory<String, SpecificRecord> specificConsumerFactory,
DeadLetterPublishingRecoverer myDeadLetter) {
return retryKafkaListenerContainerFactory(specificConsumerFactory, myDeadLetter, fixedBackOffRetry);
}
@Bean
public DeadLetterPublishingRecoverer myDeadLetter(KafkaTemplate<String, SpecificRecord> specificKafkaTemplate) {
return new DeadLetterPublishingRecoverer(specificKafkaTemplate,
(consumerRecord, exception) -> new TopicPartition((dltTopic), consumerRecord.partition()));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment