Skip to content

Instantly share code, notes, and snippets.

@haigopi
Created March 5, 2019 01:30
Show Gist options
  • Save haigopi/3fb1904dd9154d350d4f9f083d3bafbe to your computer and use it in GitHub Desktop.
Save haigopi/3fb1904dd9154d350d4f9f083d3bafbe to your computer and use it in GitHub Desktop.
Reactor Kafka Perfect Retry
ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topic))
.addAssignListener(partitions -> log.info("Partitions Assigned {}", AOUtilities.printPartitions(partitions)))
.addRevokeListener(partitions -> log.info("Partitions Revoked {}", partitions))
// .assignment(Collections.singleton(new TopicPartition(topic, 5))) // <-- ** EASY TEST ONLY **
.commitInterval(Duration.ZERO)
.commitBatchSize(0)// <-- interval and batch size: can dramatically improve the performance. Make sure to test the retries as a side effect.
.consumerProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaReceiver<Integer, String> receiver = KafkaReceiver.create(options);
// Group at first
Flux<GroupedFlux<TopicPartition, ReceiverRecord<Integer, String>>> f = Flux.defer(receiver::receive)
.groupBy(m -> m.receiverOffset().topicPartition());
// Flow each and handle retry -- Failures causes the thread to slow down on Mono::delay
Flux f1 = f.publishOn(scheduler).flatMap(r -> r.publishOn(scheduler).concatMap(b ->
Flux.just(b)
.concatMap(a -> eventConsumer.consumeEvent(a).flatMap(m ->
m.receiverOffset().commit()
))
.retryWhen(companion -> companion
.doOnNext(s -> log.info(" --> Exception: {} at: {} " + s.getMessage(), LocalTime.now()))
.zipWith(Flux.range(1, numRetries), (error, index) -> {
if (index < numRetries) {
log.info(" --> Retying {} times: {} ", index, b);
return index;
} else {
log.info(" --> Retries Exhausted: {} - {}. Commit and proceed to next", index, b);
b.receiverOffset().commit();
throw Exceptions.propagate(error);
}
})
.flatMap(index -> Mono.delay(Duration.ofSeconds((long) Math.pow(1.5, index - 1) * 10)))
.doOnNext(s -> log.info(" --> Retried at: {} ", LocalTime.now()))
))
);
// Restart when the retry attempts exhausted, Sleep 5 seconds for the pending commits to finish. (500 mills at max is enough in general)
f1.doOnError(a -> {
log.info("Moving to next:{}", a);
try {
Thread.sleep(5000); // Make it configurable
} catch (InterruptedException e) {
// Causes the restart
e.printStackTrace();
}
}
).retry().subscribe(); // Subscribe to start emitting the events.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment