Last active
August 9, 2018 02:37
-
-
Save schroedermatt/3dca9ff75fa659b62a50c8458dfe1ed2 to your computer and use it in GitHub Desktop.
bean configurations for using MockSchemaRegistryClient with @embeddedkafka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @Configuration | |
| class MockSerdeConfig { | |
| // KafkaProperties groups all properties prefixed with `spring.kafka` | |
| private KafkaProperties props | |
| MockSerdeConfig(KafkaProperties kafkaProperties) { | |
| props = kafkaProperties | |
| } | |
| /** | |
| * Mock schema registry bean used by Kafka Avro Serde since | |
| * the @EmbeddedKafka setup doesn't include a schema registry. | |
| * @return MockSchemaRegistryClient instance | |
| */ | |
| @Bean | |
| MockSchemaRegistryClient schemaRegistryClient() { | |
| new MockSchemaRegistryClient() | |
| } | |
| /** | |
| * KafkaAvroSerializer that uses the MockSchemaRegistryClient | |
| * @return KafkaAvroSerializer instance | |
| */ | |
| @Bean | |
| KafkaAvroSerializer kafkaAvroSerializer() { | |
| new KafkaAvroSerializer(schemaRegistryClient()) | |
| } | |
| /** | |
| * KafkaAvroDeserializer that uses the MockSchemaRegistryClient. | |
| * The props must be provided so that specific.avro.reader: true | |
| * is set. Without this, the consumer will receive GenericData records. | |
| * @return KafkaAvroDeserializer instance | |
| */ | |
| @Bean | |
| KafkaAvroDeserializer kafkaAvroDeserializer() { | |
| new KafkaAvroDeserializer(schemaRegistryClient(), props.buildConsumerProperties()) | |
| } | |
| /** | |
| * Configures the kafka producer factory to use the overridden | |
| * KafkaAvroDeserializer so that the MockSchemaRegistryClient | |
| * is used rather than trying to reach out via HTTP to a schema registry | |
| * @return DefaultKafkaProducerFactory instance | |
| */ | |
| @Bean | |
| DefaultKafkaProducerFactory producerFactory() { | |
| new DefaultKafkaProducerFactory( | |
| props.buildProducerProperties(), | |
| new StringSerializer(), | |
| kafkaAvroSerializer() | |
| ) | |
| } | |
| /** | |
| * Configures the kafka consumer factory to use the overridden | |
| * KafkaAvroSerializer so that the MockSchemaRegistryClient | |
| * is used rather than trying to reach out via HTTP to a schema registry | |
| * @return DefaultKafkaConsumerFactory instance | |
| */ | |
| @Bean | |
| DefaultKafkaConsumerFactory consumerFactory() { | |
| new DefaultKafkaConsumerFactory( | |
| props.buildConsumerProperties(), | |
| new StringDeserializer(), | |
| kafkaAvroDeserializer() | |
| ) | |
| } | |
| /** | |
| * Configure the ListenerContainerFactory to use the overridden | |
| * consumer factory so that the MockSchemaRegistryClient is used | |
| * under the covers by all consumers when deserializing Avro data. | |
| * @return ConcurrentKafkaListenerContainerFactory instance | |
| */ | |
| @Bean | |
| ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { | |
| ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory() | |
| factory.setConsumerFactory(consumerFactory()) | |
| return factory | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment