Last active
January 10, 2023 14:57
-
-
Save phillipuniverse/4b3d39cdcceb2363a14ebdcc170d9059 to your computer and use it in GitHub Desktop.
JUnit 5 integration test with Spring Cloud Stream and embedded Kafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.demo; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.cloud.stream.annotation.EnableBinding; | |
import org.springframework.cloud.stream.annotation.Input; | |
import org.springframework.cloud.stream.annotation.Output; | |
import org.springframework.cloud.stream.annotation.StreamListener; | |
import org.springframework.messaging.MessageChannel; | |
import org.springframework.messaging.SubscribableChannel; | |
import org.springframework.stereotype.Component; | |
import com.example.demo.DemoApplication.MessageRequestConsumer; | |
import com.example.demo.DemoApplication.MessageRequestProducer; | |
import com.fasterxml.jackson.annotation.JsonCreator; | |
import com.fasterxml.jackson.annotation.JsonProperty; | |
@EnableBinding({MessageRequestProducer.class, MessageRequestConsumer.class}) | |
@SpringBootApplication | |
public class DemoApplication { | |
public static void main(String[] args) { | |
SpringApplication.run(DemoApplication.class, args); | |
} | |
public static interface MessageRequestProducer { | |
public static final String CHANNEL = "messageRequestOutput"; | |
@Output(CHANNEL) | |
MessageChannel messageRequestOutput(); | |
} | |
public static interface MessageRequestConsumer { | |
public static final String CHANNEL = "messageRequestInput"; | |
@Input(CHANNEL) | |
SubscribableChannel messageRequestInput(); | |
} | |
@Component | |
public static class MessageRequestListener { | |
@StreamListener(MessageRequestConsumer.CHANNEL) | |
public void handle(MessageRequest req) { | |
System.out.println("Do something"); | |
} | |
} | |
public static class MessageRequest { | |
private String id; | |
@JsonCreator | |
public MessageRequest(@JsonProperty("id") String id) { | |
this.id = id; | |
} | |
public String getId() { | |
return this.id; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.demo; | |
import static org.mockito.ArgumentMatchers.argThat; | |
import static org.mockito.Mockito.timeout; | |
import static org.mockito.Mockito.verify; | |
import org.junit.jupiter.api.Test; | |
import org.mockito.AdditionalAnswers; | |
import org.mockito.Mockito; | |
import org.springframework.beans.BeansException; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.config.BeanPostProcessor; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.boot.test.context.TestConfiguration; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.integration.support.MessageBuilder; | |
import org.springframework.kafka.test.context.EmbeddedKafka; | |
import org.springframework.test.context.TestPropertySource; | |
import com.example.demo.DemoApplication.MessageRequest; | |
import com.example.demo.DemoApplication.MessageRequestListener; | |
import com.example.demo.DemoApplication.MessageRequestProducer; | |
@SpringBootTest | |
// the log.dir here avoids notwriteableexceptions that occur if this tries to write on a normal | |
// fs outside of the build directory | |
@EmbeddedKafka(brokerProperties = "log.dir=target/${random.uuid}/embedded-kafka") | |
@TestPropertySource( | |
properties = { | |
// bridge between embedded Kafka and Spring Cloud Stream | |
"spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}", | |
// using real kafka | |
"spring.autoconfigure.exclude=org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration", | |
"spring.cloud.stream.bindings.messageRequestInput.group=consumer", | |
"spring.cloud.stream.bindings.messageRequestInput.destination=messages", | |
"spring.cloud.stream.bindings.messageRequestOutput.destination=messages" | |
}) | |
public class DemoApplicationTests { | |
@TestConfiguration | |
static class Config { | |
@Bean | |
public BeanPostProcessor messageRequestListenerPostProcessor() { | |
return new ProxiedMockPostProcessor(MessageRequestListener.class); | |
} | |
/** | |
* See https://github.com/spring-projects/spring-boot/issues/7033#issuecomment-393213222 for | |
* the rationale behind this. I want real functionality to happen in the proxied | |
* {@literal @}StreamListener, but I also want to directly validate that methods were called | |
* that I expected | |
* | |
* @author Phillip Verheyden (phillipuniverse) | |
*/ | |
static class ProxiedMockPostProcessor implements BeanPostProcessor { | |
private final Class<?> mockedClass; | |
public ProxiedMockPostProcessor(Class<?> mockedClass) { | |
this.mockedClass = mockedClass; | |
} | |
@Override | |
public Object postProcessAfterInitialization(Object bean, String beanName) | |
throws BeansException { | |
if (mockedClass.isInstance(bean)) { | |
return Mockito.mock(mockedClass, AdditionalAnswers.delegatesTo(bean)); | |
} | |
return bean; | |
} | |
} | |
} | |
@Autowired | |
private MessageRequestListener listener; | |
@Autowired | |
private MessageRequestProducer producer; | |
@Test | |
public void messageIsReceived() { | |
MessageRequest req = new MessageRequest("abc123"); | |
producer.messageRequestOutput().send(MessageBuilder | |
.withPayload(req) | |
.build()); | |
// the message actually gets received. Need to do a timeout because I cannot manually force | |
// a consumption of this message from Kafka. The default for timeout() is to check every | |
// 10ms up to the timeout | |
verify(listener, timeout(5000)) | |
.handle(argThat(m -> m.getId().equals(req.getId()))); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<parent> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-parent</artifactId> | |
<version>2.1.6.RELEASE</version> | |
<relativePath/> <!-- lookup parent from repository --> | |
</parent> | |
<groupId>com.example</groupId> | |
<artifactId>demo</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<name>demo</name> | |
<description>Demo project for Spring Boot</description> | |
<properties> | |
<java.version>1.8</java.version> | |
<spring-cloud.version>Greenwich.SR2</spring-cloud.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.cloud</groupId> | |
<artifactId>spring-cloud-stream</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.cloud</groupId> | |
<artifactId>spring-cloud-stream-binder-kafka</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-test</artifactId> | |
<scope>test</scope> | |
<exclusions> | |
<exclusion> | |
<groupId>junit</groupId> | |
<artifactId>junit</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter</artifactId> | |
<scope>test</scope> | |
<version>5.4.2</version> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.cloud</groupId> | |
<artifactId>spring-cloud-stream-test-support</artifactId> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka-test</artifactId> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<dependencyManagement> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.cloud</groupId> | |
<artifactId>spring-cloud-dependencies</artifactId> | |
<version>${spring-cloud.version}</version> | |
<type>pom</type> | |
<scope>import</scope> | |
</dependency> | |
</dependencies> | |
</dependencyManagement> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-maven-plugin</artifactId> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
i am getting below error while executing the kafka test case. any suggestions on how to fix the issue?
2020-09-11 15:55:17 org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/:61252. Will not attempt to authenticate using SASL (unknown error)
2020-09-11 15:55:17 logType=WARN org.apache.zookeeper.ClientCnxn - Session 0x0 for server 127.0.0.1/:61252, unexpected error, closing socket connection and attempting reconnect
java.nio.channels.UnresolvedAddressException: null
at java.base/sun.nio.ch.Net.checkAddress(Net.java:139)
at java.base/sun.nio.ch.SocketChannelImpl.checkRemote(SocketChannelImpl.java:727)
at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:741)
at org.apache.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:277)
at org.apache.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:287)
at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1021)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1064)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server '127.0.0.1:61252' with timeout of 6000 ms