Last active
June 21, 2023 10:15
-
-
Save vzickner/577c53164a97b9918a49e6c0235813f4 to your computer and use it in GitHub Desktop.
Sample Test Case with Embedded Kafka https://blog.mimacom.com/testing-apache-kafka-with-spring-boot-junit5/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<parent> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-parent</artifactId> | |
<version>2.2.5.RELEASE</version> | |
<relativePath/> <!-- lookup parent from repository --> | |
</parent> | |
<groupId>com.example</groupId> | |
<artifactId>demo</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<name>demo</name> | |
<description>Demo project for Spring Boot</description> | |
<properties> | |
<java.version>1.8</java.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-test</artifactId> | |
<scope>test</scope> | |
<exclusions> | |
<exclusion> | |
<groupId>org.junit.vintage</groupId> | |
<artifactId>junit-vintage-engine</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka-test</artifactId> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-maven-plugin</artifactId> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.demo; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.producer.Producer; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.junit.jupiter.api.AfterAll; | |
import org.junit.jupiter.api.BeforeAll; | |
import org.junit.jupiter.api.Test; | |
import org.junit.jupiter.api.TestInstance; | |
import org.junit.jupiter.api.extension.ExtendWith; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | |
import org.springframework.kafka.core.DefaultKafkaProducerFactory; | |
import org.springframework.kafka.listener.ContainerProperties; | |
import org.springframework.kafka.listener.KafkaMessageListenerContainer; | |
import org.springframework.kafka.listener.MessageListener; | |
import org.springframework.kafka.test.EmbeddedKafkaBroker; | |
import org.springframework.kafka.test.context.EmbeddedKafka; | |
import org.springframework.kafka.test.utils.ContainerTestUtils; | |
import org.springframework.kafka.test.utils.KafkaTestUtils; | |
import org.springframework.test.context.junit.jupiter.SpringExtension; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import static org.assertj.core.api.Assertions.assertThat; | |
@EmbeddedKafka | |
@ExtendWith(SpringExtension.class) | |
@TestInstance(TestInstance.Lifecycle.PER_CLASS) | |
public class SimpleKafkaTest { | |
private static final String TOPIC = "domain-events"; | |
@Autowired | |
private EmbeddedKafkaBroker embeddedKafkaBroker; | |
BlockingQueue<ConsumerRecord<String, String>> records; | |
KafkaMessageListenerContainer<String, String> container; | |
@BeforeAll | |
void setUp() { | |
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker)); | |
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()); | |
ContainerProperties containerProperties = new ContainerProperties(TOPIC); | |
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties); | |
records = new LinkedBlockingQueue<>(); | |
container.setupMessageListener((MessageListener<String, String>) records::add); | |
container.start(); | |
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic()); | |
} | |
@AfterAll | |
void tearDown() { | |
container.stop(); | |
} | |
@Test | |
public void kafkaSetup_withTopic_ensureSendMessageIsReceived() throws Exception { | |
// Arrange | |
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker)); | |
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer(); | |
// Act | |
producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "{\"event\":\"Test Event\"}")); | |
producer.flush(); | |
// Assert | |
ConsumerRecord<String, String> singleRecord = records.poll(100, TimeUnit.MILLISECONDS); | |
assertThat(singleRecord).isNotNull(); | |
assertThat(singleRecord.key()).isEqualTo("my-aggregate-id"); | |
assertThat(singleRecord.value()).isEqualTo("{\"event\":\"Test Event\"}"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
hello, this example helped me a lot thanks. i have one question about Embedded Kafka. suppose i have few different test suites. how can i share the same embedded kafka broker? i think it will be redundant to start one for each test.
thanks