Created
March 5, 2019 01:30
-
-
Save haigopi/3fb1904dd9154d350d4f9f083d3bafbe to your computer and use it in GitHub Desktop.
Reactor Kafka Perfect Retry
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topic)) | |
.addAssignListener(partitions -> log.info("Partitions Assigned {}", AOUtilities.printPartitions(partitions))) | |
.addRevokeListener(partitions -> log.info("Partitions Revoked {}", partitions)) | |
// .assignment(Collections.singleton(new TopicPartition(topic, 5))) // <-- ** EASY TEST ONLY ** | |
.commitInterval(Duration.ZERO) | |
.commitBatchSize(0)// <-- interval and batch size: can dramatically improve the performance. Make sure to test the retries as a side effect. | |
.consumerProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); | |
KafkaReceiver<Integer, String> receiver = KafkaReceiver.create(options); | |
// Group at first | |
Flux<GroupedFlux<TopicPartition, ReceiverRecord<Integer, String>>> f = Flux.defer(receiver::receive) | |
.groupBy(m -> m.receiverOffset().topicPartition()); | |
// Flow each and handle retry -- Failures causes the thread to slow down on Mono::delay | |
Flux f1 = f.publishOn(scheduler).flatMap(r -> r.publishOn(scheduler).concatMap(b -> | |
Flux.just(b) | |
.concatMap(a -> eventConsumer.consumeEvent(a).flatMap(m -> | |
m.receiverOffset().commit() | |
)) | |
.retryWhen(companion -> companion | |
.doOnNext(s -> log.info(" --> Exception: {} at: {} " + s.getMessage(), LocalTime.now())) | |
.zipWith(Flux.range(1, numRetries), (error, index) -> { | |
if (index < numRetries) { | |
log.info(" --> Retying {} times: {} ", index, b); | |
return index; | |
} else { | |
log.info(" --> Retries Exhausted: {} - {}. Commit and proceed to next", index, b); | |
b.receiverOffset().commit(); | |
throw Exceptions.propagate(error); | |
} | |
}) | |
.flatMap(index -> Mono.delay(Duration.ofSeconds((long) Math.pow(1.5, index - 1) * 10))) | |
.doOnNext(s -> log.info(" --> Retried at: {} ", LocalTime.now())) | |
)) | |
); | |
// Restart when the retry attempts exhausted, Sleep 5 seconds for the pending commits to finish. (500 mills at max is enough in general) | |
f1.doOnError(a -> { | |
log.info("Moving to next:{}", a); | |
try { | |
Thread.sleep(5000); // Make it configurable | |
} catch (InterruptedException e) { | |
// Causes the restart | |
e.printStackTrace(); | |
} | |
} | |
).retry().subscribe(); // Subscribe to start emitting the events. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment