Last active
April 21, 2025 22:05
-
-
Save smaillns/e15852ba7f7207566e4425afdcbd8457 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Configuration | |
@ConfigurationProperties(prefix = "app.kafka.my-consumer") | |
@Getter | |
@Setter | |
@EqualsAndHashCode(callSuper = true) | |
public class MyKafkaConfig extends CommonKafkaConfig<String, SpecificRecord> { | |
@Autowired | |
private KafkaProperties kafkaProperties; | |
@Value ("${app.kafka.my-consumer.topic.retry}") | |
private String retryTopic; | |
@Value ("${app.kafka.my-consumer.topic.error}") | |
private String dltTopic; | |
@Bean | |
public ConsumerFactory<String, SpecificRecord> specificConsumerFactory() { | |
return consumerResource(kafkaProperties); | |
} | |
@Bean | |
public ProducerFactory<String, SpecificRecord> specificProducerFactory() { | |
return producerResource(kafkaProperties); | |
} | |
@Bean | |
public KafkaTemplate<String, SpecificRecord> specificKafkaTemplate(ProducerFactory<String, SpecificRecord> specificProducerFactory) { | |
return new KafkaTemplate<>(specificProducerFactory); | |
} | |
@Bean | |
public ConcurrentKafkaListenerContainerFactory<String, SpecificRecord> myListenerFactory( | |
ConsumerFactory<String, SpecificRecord> specificConsumerFactory, KafkaTemplate<String, SpecificRecord> specificKafkaTemplate, | |
DeadLetterPublishingRecoverer movementDeadLetter) { | |
return retryKafkaListenerContainerFactory(specificConsumerFactory, retryTopic, specificKafkaTemplate, movementDeadLetter, fixedBackOffMain); | |
} | |
@Bean | |
public ConcurrentKafkaListenerContainerFactory<String, SpecificRecord> myRetryListenerFactory( | |
ConsumerFactory<String, SpecificRecord> specificConsumerFactory, | |
DeadLetterPublishingRecoverer myDeadLetter) { | |
return retryKafkaListenerContainerFactory(specificConsumerFactory, myDeadLetter, fixedBackOffRetry); | |
} | |
@Bean | |
public DeadLetterPublishingRecoverer myDeadLetter(KafkaTemplate<String, SpecificRecord> specificKafkaTemplate) { | |
return new DeadLetterPublishingRecoverer(specificKafkaTemplate, | |
(consumerRecord, exception) -> new TopicPartition((dltTopic), consumerRecord.partition())); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment