Last active
January 30, 2020 07:22
-
-
Save antonmry/ab97b6d0001903713aaec42735d1cd81 to your computer and use it in GitHub Desktop.
SpringBoot application with Junit5 tests using EmbeddedKafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
plugins { | |
id 'org.springframework.boot' version '2.2.4.RELEASE' | |
id 'io.spring.dependency-management' version '1.0.9.RELEASE' | |
id 'java' | |
} | |
group = 'example' | |
version = '0.0.1-SNAPSHOT' | |
sourceCompatibility = '1.8' | |
configurations { | |
developmentOnly | |
runtimeClasspath { | |
extendsFrom developmentOnly | |
} | |
compileOnly { | |
extendsFrom annotationProcessor | |
} | |
} | |
repositories { | |
mavenCentral() | |
} | |
dependencies { | |
// SpringBoot | |
implementation 'org.springframework.boot:spring-boot-starter-web' | |
// Kafka: https://docs.spring.io/spring-kafka/reference/html/#deps-for-24x | |
implementation 'org.springframework.kafka:spring-kafka:2.4.1.RELEASE' | |
implementation 'org.apache.kafka:kafka-clients:2.4.0' | |
testImplementation('org.springframework.kafka:spring-kafka-test:2.4.1.RELEASE') { | |
exclude module: 'kafka_2.11' | |
} | |
testImplementation 'org.apache.kafka:kafka-clients:2.4.0:test' | |
testImplementation 'org.apache.kafka:kafka_2.12:2.4.0' | |
testImplementation 'org.apache.kafka:kafka_2.12:2.4.0:test' | |
// Dev | |
compileOnly 'org.projectlombok:lombok' | |
testCompileOnly 'org.projectlombok:lombok' | |
developmentOnly 'org.springframework.boot:spring-boot-devtools' | |
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' | |
annotationProcessor 'org.projectlombok:lombok' | |
testAnnotationProcessor 'org.projectlombok:lombok' | |
// Testing | |
testImplementation('org.springframework.boot:spring-boot-starter-test') | |
} | |
test { | |
useJUnitPlatform() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example; | |
import lombok.*; | |
@Data | |
@NoArgsConstructor(access = AccessLevel.PRIVATE, force = true) | |
@AllArgsConstructor | |
public class Foo2 { | |
private String foo; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
@SpringBootApplication | |
@Slf4j | |
public class MyApplication { | |
public static void main(String[] args) { | |
SpringApplication.run(MyApplication.class); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example; | |
import lombok.extern.slf4j.Slf4j; | |
import org.junit.jupiter.api.Test; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.kafka.annotation.KafkaListener; | |
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | |
import org.springframework.kafka.core.ConsumerFactory; | |
import org.springframework.kafka.core.KafkaTemplate; | |
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; | |
import org.springframework.kafka.listener.SeekToCurrentErrorHandler; | |
import org.springframework.kafka.support.converter.RecordMessageConverter; | |
import org.springframework.kafka.support.converter.StringJsonMessageConverter; | |
import org.springframework.kafka.test.context.EmbeddedKafka; | |
import org.springframework.util.backoff.FixedBackOff; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
import static org.junit.jupiter.api.Assertions.assertTrue; | |
@EmbeddedKafka(topics = {"topic1", "topic1.DLT"}, | |
bootstrapServersProperty = "spring.kafka.bootstrap-servers") | |
@SpringBootTest | |
@Slf4j | |
public class KafkaTests { | |
@Autowired | |
private KafkaTemplate<Object, Object> template; | |
@Bean | |
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( | |
ConcurrentKafkaListenerContainerFactoryConfigurer configurer, | |
ConsumerFactory<Object, Object> kafkaConsumerFactory, | |
KafkaTemplate<Object, Object> template) { | |
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); | |
configurer.configure(factory, kafkaConsumerFactory); | |
factory.setErrorHandler(new SeekToCurrentErrorHandler( | |
// dead-letter after 3 tries | |
new DeadLetterPublishingRecoverer(template), new FixedBackOff(0L, 2))); | |
factory.setMessageConverter(new StringJsonMessageConverter()); | |
return factory; | |
} | |
@Bean | |
public RecordMessageConverter converter() { | |
return new StringJsonMessageConverter(); | |
} | |
final CountDownLatch latch = new CountDownLatch(4); | |
@KafkaListener(id = "fooGroup", topics = "topic1", containerFactory = "kafkaListenerContainerFactory") | |
public void listen(Foo2 foo) { | |
log.info("Received: " + foo); | |
latch.countDown(); | |
if (foo.getFoo().startsWith("fail")) { | |
throw new RuntimeException("failed"); | |
} | |
} | |
@KafkaListener(id = "dltGroup", topics = "topic1.DLT") | |
public void dltListen(String in) { | |
log.info("Received from DLT: " + in); | |
} | |
@Test | |
public void testKafka() throws Exception { | |
// Send | |
template.send("topic1", new Foo2("foo")); | |
template.send("topic1", new Foo2("bar")); | |
template.send("topic1", new Foo2("baz")); | |
template.send("topic1", new Foo2("qux")); | |
template.flush(); | |
assertTrue(latch.await(60, TimeUnit.SECONDS)); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment