Skip to content

Instantly share code, notes, and snippets.

@cherniag
Created August 4, 2021 15:36
Show Gist options
  • Save cherniag/513bf7b70d411df9a5478937b01ea989 to your computer and use it in GitHub Desktop.
Save cherniag/513bf7b70d411df9a5478937b01ea989 to your computer and use it in GitHub Desktop.
flux retry
public static void main(String[] args) throws InterruptedException {
Flux.fromIterable(List.of(1001, 1002, 2001, 3303, 1003, 2002))
.flatMap(EventsBuilder::get)
.subscribe(i -> System.out.println(new Date() + " " + Thread.currentThread().getName() + " on next " + i), System.err::println, () -> System.out.println("complete"));
Thread.sleep(10000);
}
static int count = 0;
private static Mono<String> get(int i) {
return Mono.just(i)
.doOnNext(el -> System.out.println(new Date() + " " + Thread.currentThread().getName() + " before delay " + el))
.delayElement(Duration.ofMillis(i))
.publishOn(Schedulers.elastic())
.doOnNext(el -> System.out.println(new Date() + " " + Thread.currentThread().getName() + " after delay " + el))
.map(el -> {
if (++count % 3 == 0) {
System.out.println(new Date() + " " + Thread.currentThread().getName() + " FAILED "+ i + " and " + count);
throw new RuntimeException("Error on " + i + " and " + count);
}
return el;
})
.map(String::valueOf)
.retryWhen(Retry.backoff(3, Duration.ofMillis(1000))
.doBeforeRetry(signal -> System.out.println(new Date() + " " + Thread.currentThread().getName() + " before retry: " +signal.totalRetries() + " cause: " + signal.failure().getMessage()))
.filter(ex -> ex instanceof RuntimeException));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment