Created
May 20, 2020 11:03
-
-
Save adamw/201da9b9fd360c38e0c88e1aac1a6e1d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.softwaremill.test; | |
import akka.actor.ActorSystem; | |
import akka.kafka.CommitterSettings; | |
import akka.kafka.ConsumerMessage; | |
import akka.kafka.ConsumerSettings; | |
import akka.kafka.Subscriptions; | |
import akka.kafka.javadsl.Committer; | |
import akka.kafka.javadsl.Consumer; | |
import akka.stream.OverflowStrategy; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import java.net.URI; | |
import java.time.Duration; | |
import java.util.concurrent.CompletableFuture; | |
public class Test { | |
private final ActorSystem actorSystem; | |
private final ConsumerSettings<String, String> kafkaConsumerSettings; | |
public Test(ActorSystem actorSystem) { | |
this.actorSystem = actorSystem; | |
kafkaConsumerSettings = | |
ConsumerSettings.create(actorSystem, new StringDeserializer(), new StringDeserializer()) | |
.withBootstrapServers("localhost:9092") | |
.withGroupId("test") | |
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | |
} | |
private CompletableFuture<URI> lookupNodeInRedis(String data) { | |
return null; | |
} | |
private CompletableFuture<String> processOnServer(URI uri, String data) { | |
return null; // send an HTTP request | |
} | |
/** | |
* A short example of using a reactive streams implementation in practice (here: akka-streams), based on a | |
* real-world use-case. | |
* The flow is quite simple: | |
* - read data from a Kafka topic | |
* - create a buffer of 4k messages, to smoothen the processing in case of spikes in Kafka data / slowdowns in | |
* Redis/HTTP | |
* - obtain the URI to which the given data should be sent from a Redis cache | |
* - send the data to the URI for futher processing | |
* - commit the offset of processed messages to Kafka | |
* | |
* What I think is good about that code: | |
* - all concurrency/threading is hidden - I only specify how many elements should be processed in parallel | |
* - no explicit locks, mutexes, blocking operations - less chances for a deadlock or other bugs | |
* - ordering in mapAsync is preserved (which is important for commiting to Kafka) | |
* - in case of an upstream error, this will be propagated to all stages and they will all have a chance to | |
* release their resources (e.g. Kafka connections) | |
* - execution is memory bound (I can create an upper bound on the number of elements that will be read into | |
* memory at any given time). That is, e.g. slower HTTP processing, or slower Kafka commits, will cause the | |
* whole flow to slow down, without exhausting memory. | |
* - there's little technical details, and the business logic is nicely fleshed out | |
*/ | |
public void run() { | |
CommitterSettings committerSettings = CommitterSettings.create(actorSystem); | |
Consumer.committableSource(kafkaConsumerSettings, Subscriptions.topics("topic")) | |
.buffer(4096, OverflowStrategy.backpressure()) | |
.mapAsync(16, msg -> lookupNodeInRedis(msg.record().value()) | |
.thenApply(uri -> new UriDataOffset(uri, msg.record().value(), msg.committableOffset()))) | |
.mapAsync(8, uriDataOffset -> processOnServer(uriDataOffset.uri, uriDataOffset.data) | |
.thenApply(done -> uriDataOffset.offset)) | |
.toMat(Committer.sink(committerSettings.withMaxInterval(Duration.ofSeconds(1))), | |
Consumer::createDrainingControl); | |
} | |
private class UriDataOffset { | |
public final URI uri; | |
public final String data; | |
public final ConsumerMessage.CommittableOffset offset; | |
public UriDataOffset(URI uri, String data, ConsumerMessage.CommittableOffset offset) { | |
this.uri = uri; | |
this.data = data; | |
this.offset = offset; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment