Skip to content

Instantly share code, notes, and snippets.

@adamw
Created May 20, 2020 11:03
Show Gist options
  • Save adamw/201da9b9fd360c38e0c88e1aac1a6e1d to your computer and use it in GitHub Desktop.
Save adamw/201da9b9fd360c38e0c88e1aac1a6e1d to your computer and use it in GitHub Desktop.
package com.softwaremill.test;
import akka.actor.ActorSystem;
import akka.kafka.CommitterSettings;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Committer;
import akka.kafka.javadsl.Consumer;
import akka.stream.OverflowStrategy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
public class Test {
private final ActorSystem actorSystem;
private final ConsumerSettings<String, String> kafkaConsumerSettings;
public Test(ActorSystem actorSystem) {
this.actorSystem = actorSystem;
kafkaConsumerSettings =
ConsumerSettings.create(actorSystem, new StringDeserializer(), new StringDeserializer())
.withBootstrapServers("localhost:9092")
.withGroupId("test")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
private CompletableFuture<URI> lookupNodeInRedis(String data) {
return null;
}
private CompletableFuture<String> processOnServer(URI uri, String data) {
return null; // send an HTTP request
}
/**
* A short example of using a reactive streams implementation in practice (here: akka-streams), based on a
* real-world use-case.
* The flow is quite simple:
* - read data from a Kafka topic
* - create a buffer of 4k messages, to smoothen the processing in case of spikes in Kafka data / slowdowns in
* Redis/HTTP
* - obtain the URI to which the given data should be sent from a Redis cache
* - send the data to the URI for futher processing
* - commit the offset of processed messages to Kafka
*
* What I think is good about that code:
* - all concurrency/threading is hidden - I only specify how many elements should be processed in parallel
* - no explicit locks, mutexes, blocking operations - less chances for a deadlock or other bugs
* - ordering in mapAsync is preserved (which is important for commiting to Kafka)
* - in case of an upstream error, this will be propagated to all stages and they will all have a chance to
* release their resources (e.g. Kafka connections)
* - execution is memory bound (I can create an upper bound on the number of elements that will be read into
* memory at any given time). That is, e.g. slower HTTP processing, or slower Kafka commits, will cause the
* whole flow to slow down, without exhausting memory.
* - there's little technical details, and the business logic is nicely fleshed out
*/
public void run() {
CommitterSettings committerSettings = CommitterSettings.create(actorSystem);
Consumer.committableSource(kafkaConsumerSettings, Subscriptions.topics("topic"))
.buffer(4096, OverflowStrategy.backpressure())
.mapAsync(16, msg -> lookupNodeInRedis(msg.record().value())
.thenApply(uri -> new UriDataOffset(uri, msg.record().value(), msg.committableOffset())))
.mapAsync(8, uriDataOffset -> processOnServer(uriDataOffset.uri, uriDataOffset.data)
.thenApply(done -> uriDataOffset.offset))
.toMat(Committer.sink(committerSettings.withMaxInterval(Duration.ofSeconds(1))),
Consumer::createDrainingControl);
}
private class UriDataOffset {
public final URI uri;
public final String data;
public final ConsumerMessage.CommittableOffset offset;
public UriDataOffset(URI uri, String data, ConsumerMessage.CommittableOffset offset) {
this.uri = uri;
this.data = data;
this.offset = offset;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment