Created
August 4, 2021 15:36
-
-
Save cherniag/513bf7b70d411df9a5478937b01ea989 to your computer and use it in GitHub Desktop.
flux retry
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws InterruptedException { | |
Flux.fromIterable(List.of(1001, 1002, 2001, 3303, 1003, 2002)) | |
.flatMap(EventsBuilder::get) | |
.subscribe(i -> System.out.println(new Date() + " " + Thread.currentThread().getName() + " on next " + i), System.err::println, () -> System.out.println("complete")); | |
Thread.sleep(10000); | |
} | |
static int count = 0; | |
private static Mono<String> get(int i) { | |
return Mono.just(i) | |
.doOnNext(el -> System.out.println(new Date() + " " + Thread.currentThread().getName() + " before delay " + el)) | |
.delayElement(Duration.ofMillis(i)) | |
.publishOn(Schedulers.elastic()) | |
.doOnNext(el -> System.out.println(new Date() + " " + Thread.currentThread().getName() + " after delay " + el)) | |
.map(el -> { | |
if (++count % 3 == 0) { | |
System.out.println(new Date() + " " + Thread.currentThread().getName() + " FAILED "+ i + " and " + count); | |
throw new RuntimeException("Error on " + i + " and " + count); | |
} | |
return el; | |
}) | |
.map(String::valueOf) | |
.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)) | |
.doBeforeRetry(signal -> System.out.println(new Date() + " " + Thread.currentThread().getName() + " before retry: " +signal.totalRetries() + " cause: " + signal.failure().getMessage())) | |
.filter(ex -> ex instanceof RuntimeException)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment