Skip to content

Instantly share code, notes, and snippets.

@schroedermatt
Last active August 9, 2018 02:37
Show Gist options
  • Select an option

  • Save schroedermatt/3dca9ff75fa659b62a50c8458dfe1ed2 to your computer and use it in GitHub Desktop.

Select an option

Save schroedermatt/3dca9ff75fa659b62a50c8458dfe1ed2 to your computer and use it in GitHub Desktop.
bean configurations for using MockSchemaRegistryClient with @embeddedkafka
@Configuration
class MockSerdeConfig {
// KafkaProperties groups all properties prefixed with `spring.kafka`
private KafkaProperties props
MockSerdeConfig(KafkaProperties kafkaProperties) {
props = kafkaProperties
}
/**
* Mock schema registry bean used by Kafka Avro Serde since
* the @EmbeddedKafka setup doesn't include a schema registry.
* @return MockSchemaRegistryClient instance
*/
@Bean
MockSchemaRegistryClient schemaRegistryClient() {
new MockSchemaRegistryClient()
}
/**
* KafkaAvroSerializer that uses the MockSchemaRegistryClient
* @return KafkaAvroSerializer instance
*/
@Bean
KafkaAvroSerializer kafkaAvroSerializer() {
new KafkaAvroSerializer(schemaRegistryClient())
}
/**
* KafkaAvroDeserializer that uses the MockSchemaRegistryClient.
* The props must be provided so that specific.avro.reader: true
* is set. Without this, the consumer will receive GenericData records.
* @return KafkaAvroDeserializer instance
*/
@Bean
KafkaAvroDeserializer kafkaAvroDeserializer() {
new KafkaAvroDeserializer(schemaRegistryClient(), props.buildConsumerProperties())
}
/**
* Configures the kafka producer factory to use the overridden
* KafkaAvroDeserializer so that the MockSchemaRegistryClient
* is used rather than trying to reach out via HTTP to a schema registry
* @return DefaultKafkaProducerFactory instance
*/
@Bean
DefaultKafkaProducerFactory producerFactory() {
new DefaultKafkaProducerFactory(
props.buildProducerProperties(),
new StringSerializer(),
kafkaAvroSerializer()
)
}
/**
* Configures the kafka consumer factory to use the overridden
* KafkaAvroSerializer so that the MockSchemaRegistryClient
* is used rather than trying to reach out via HTTP to a schema registry
* @return DefaultKafkaConsumerFactory instance
*/
@Bean
DefaultKafkaConsumerFactory consumerFactory() {
new DefaultKafkaConsumerFactory(
props.buildConsumerProperties(),
new StringDeserializer(),
kafkaAvroDeserializer()
)
}
/**
* Configure the ListenerContainerFactory to use the overridden
* consumer factory so that the MockSchemaRegistryClient is used
* under the covers by all consumers when deserializing Avro data.
* @return ConcurrentKafkaListenerContainerFactory instance
*/
@Bean
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory()
factory.setConsumerFactory(consumerFactory())
return factory
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment