-
-
Save qyf404/4e3cf275b1c96f02d6f094c2c4063c8c to your computer and use it in GitHub Desktop.
Using embedded Kafka in Spring Boot unit test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spring: | |
resources: | |
# disable serving of static web files since this is a REST/Actuator only web app | |
add-mappings: false | |
kafka: | |
producer: | |
value-serializer: org.apache.kafka.common.serialization.StringSerializer |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import static org.junit.Assert.assertThat; | |
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasKey; | |
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue; | |
import static org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord; | |
import java.util.Map; | |
import org.apache.kafka.clients.consumer.Consumer; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.common.serialization.Deserializer; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | |
import org.springframework.kafka.test.EmbeddedKafkaBroker; | |
import org.springframework.kafka.test.context.EmbeddedKafka; | |
import org.springframework.kafka.test.utils.KafkaTestUtils; | |
import org.springframework.test.context.junit4.SpringRunner; | |
@RunWith(SpringRunner.class) | |
// Test with a "real", but embedded Kafka instance, gives us the EmbeddedKafkaBroker to autowire | |
@EmbeddedKafka( | |
// We're only needing to test Kafka serializing interactions, so keep partitioning simple | |
partitions = 1, | |
// use some non-default topics to test via | |
topics = { | |
KafkaEgressTest.TOPIC_METRICS | |
}) | |
// Using @SpringBootTest mainly so we can get the standard properties binding bootstrap processing | |
// and the properties source. The loading and binding of {@link KafkaTopicProperties} is one of | |
// the main things we're testing in this suite. | |
@SpringBootTest( | |
// tell Spring Boot Kafka auto-config about the embedded kafka endpoints | |
properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", | |
// slice our unit test app context down to just these specific pieces | |
classes = { | |
// ...the service to test | |
OurService.class, | |
// ...use standard Sprint Boot kafka auto-config to give us KafkaTemplate, etc | |
KafkaAutoConfiguration.class, | |
// ...and our additional test config | |
KafkaProducerTest.TestConfig.class | |
} | |
) | |
public class KafkaProducerTest { | |
public static final String TOPIC_METRICS = "test.metrics.json"; | |
// Declare our own unit test Spring config | |
@Configuration | |
public static class TestConfig { | |
// Adjust our standard topic properties to point metrics at our test topic | |
@Bean | |
public AppProperties appProperties() { | |
final AppProperties properties = new AppProperties(); | |
properties.setMetrics(TOPIC_METRICS); | |
return properties; | |
} | |
} | |
// IntelliJ gets confused finding this broker bean when @SpringBootTest is activated | |
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") | |
// Autowire the kafka broker registered via @EmbeddedKafka | |
@Autowired | |
private EmbeddedKafkaBroker embeddedKafka; | |
// Autowire the service we're testing | |
@Autowired | |
OurService ourService; | |
@Test | |
public void testMetricsEncodedAsSent() { | |
ourService.send("tenant-1", KafkaMessageType.METRIC, "{\"id\":1}"); | |
final Consumer<String, String> consumer = buildConsumer( | |
StringDeserializer.class, | |
StringDeserializer.class | |
); | |
embeddedKafka.consumeFromEmbeddedTopics(consumer, TOPIC_METRICS); | |
final ConsumerRecord<String, String> record = getSingleRecord(consumer, TOPIC_METRICS, 500); | |
// Use Hamcrest matchers provided by spring-kafka-test | |
// https://docs.spring.io/spring-kafka/docs/2.2.4.RELEASE/reference/#hamcrest-matchers | |
assertThat(record, hasKey("tenant-1")); | |
assertThat(record, hasValue("{\"id\":1}")); | |
} | |
private <K,V> Consumer<K, V> buildConsumer(Class<? extends Deserializer> keyDeserializer, | |
Class<? extends Deserializer> valueDeserializer) { | |
// Use the procedure documented at https://docs.spring.io/spring-kafka/docs/2.2.4.RELEASE/reference/#embedded-kafka-annotation | |
final Map<String, Object> consumerProps = KafkaTestUtils | |
.consumerProps("testMetricsEncodedAsSent", "true", embeddedKafka); | |
// Since we're pre-sending the messages to test for, we need to read from start of topic | |
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | |
// We need to match the ser/deser used in expected application config | |
consumerProps | |
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName()); | |
consumerProps | |
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName()); | |
final DefaultKafkaConsumerFactory<K, V> consumerFactory = | |
new DefaultKafkaConsumerFactory<>(consumerProps); | |
return consumerFactory.createConsumer(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment