This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@QuarkusTest | |
@QuarkusTestResource(ContainerKafkaTestResource.class) | |
@Stereotype | |
@Retention(RetentionPolicy.RUNTIME) | |
@Target(ElementType.TYPE) | |
@DisabledIfSystemProperty(named = "container-enabled", matches = "false") | |
public @interface QuarkusContainerKafkaTest {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ContainerKafkaTestResource implements QuarkusTestResourceLifecycleManager { | |
private static final KafkaContainer kafka = new KafkaContainer(); | |
/** | |
* @return A map of system properties that should be set for the running tests | |
*/ | |
@Override | |
public Map<String, String> start() { | |
kafka.start(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
KafkaStreams streams = new KafkaStreams(topology, config); | |
// other configurations | |
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> { | |
// examine the throwable and do something | |
}); | |
streams.start(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class CustomExceptionHandler implements ProductionExceptionHandler { | |
private ExceptionService exceptionService; | |
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record, final Exception exception) { | |
boolean shouldContinue = exceptionService.handleProductionException(record, exception); | |
return shouldContinue ? CONTINUE : FAIL; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Component | |
public class SpringContext implements ApplicationContextAware { | |
private static ApplicationContext context; | |
@Override | |
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { | |
context = applicationContext; | |
} | |
public static <T> T getBean(Class<T> beanClass) { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
plugins { | |
id 'java' | |
id 'com.diffplug.gradle.spotless' version '3.26.1' | |
// other plugins | |
} | |
repositories { .. } | |
dependencies { .. } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Bean | |
RetryTemplate retryTemplate() { | |
RetryTemplate retryTemplate = new RetryTemplate() | |
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy() | |
fixedBackOffPolicy.setBackOffPeriod(1000l) | |
retryTemplate.setBackOffPolicy(fixedBackOffPolicy) | |
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy() | |
retryPolicy.setMaxAttempts(2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ExponentialBackOffPolicy: increases back off period exponentially. The initial interval and multiplier are configurable. | |
ExponentialRandomBackoffPolicy: chooses a random multiple of the interval that would come from a simple deterministic exponential. This has shown to at least be useful in testing scenarios where excessive contention is generated by the test needing many retries. | |
FixedBackOffPolicy: pauses for a fixed period of time (using Sleeper.sleep(long)) before continuing. | |
NoBackOffPolicy: performs all retries one after the other without pause | |
UniformRandomBackOffPolicy: pauses for random period of time before continuing |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
AlwaysRetryPolicy: A subclass of NeverRetryPolicy that is mainly used as a base for other policies (e.g. a test stub) | |
CircuitBreakerRetryPolicy: Trips circuit open after a given number of failures and stays open until a set timeout elapses | |
CompositeRetryPolicy: mix and match multiple policies (they will be called in the order given) | |
ExceptionClassifierRetryPolicy: specify different policies for different exception types | |
ExpressionRetryPolicy: subclass of SimpleRetryPolicy that evaluates an expression against the last thrown exception | |
NeverRetryPolicy: allows the first attempt but never permits a retry | |
SimpleRetryPolicy: retry a fixed number of times for a set of named exceptions (and subclasses) | |
TimeoutRetryPolicy: allows a retry as long as it hasn’t timed out (clock is started on a call to RetryContext) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Bean | |
public KafkaListenerContainerFactory kafkaListenerContainerFactory(RetryTemplate retryTemplate) { | |
def factory = /** configure factory **/ | |
// configure the listener container factory with retry support | |
factory.setRetryTemplate(retryTemplate); | |
factory.setRecoveryCallback(context -> { | |
log.error("RetryPolicy limit has been exceeded! You should really handle this better."); | |
return null; | |
}); |