Skip to content

Instantly share code, notes, and snippets.

@haigopi
Created December 11, 2018 18:18
Show Gist options
  • Save haigopi/aaec86909379d14e9fc19b37af1284e6 to your computer and use it in GitHub Desktop.
Save haigopi/aaec86909379d14e9fc19b37af1284e6 to your computer and use it in GitHub Desktop.
{
scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);
ReceiverOptions<Integer, String> options = receiverOptions.subscription(Collections.singleton(topic))
.addAssignListener(partitions -> log.info("Partitions Assigned {}", printPartitions(partitions)))
.addRevokeListener(partitions -> log.info("Partitions Revoked {}", printPartitions(partitions)))
//.assignment(Collections.singleton(new TopicPartition(topic, 9))) // <-- ** EASY TEST ONLY **
.commitInterval(Duration.ZERO)
.commitBatchSize(0);
KafkaReceiver<Integer, String> receiver = KafkaReceiver.create(options);
disposable = Flux.defer(receiver::receive)
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.concatMap(m -> consumeMessage(m)
.thenEmpty(m.receiverOffset().commit())))
.retryBackoff(numRetries, Duration.ofSeconds(2), Duration.ofHours(2))
.doOnError(err -> {
handleError(err);
}).retry()
.doOnCancel(() -> close()).subscribe();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment