Created
December 11, 2024 20:50
-
-
Save calvinlfer/0680bba281d7e4ee361109f87c1d227c to your computer and use it in GitHub Desktop.
Integrating the Helidon Kafka client with FS2 Streams using the interop flow package
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.syntax.parallel.* | |
import cats.effect.* | |
import fs2.* | |
import fs2.interop.flow.* | |
import io.helidon.config.Config | |
import io.helidon.messaging.connectors.kafka.KafkaConfigBuilder.AutoOffsetReset | |
import io.helidon.messaging.connectors.kafka.{KafkaConnector as HelidonKafkaConnector, KafkaMessage, KafkaPublisher as HelidonKafkaConsumer} | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import org.reactivestreams.FlowAdapters | |
import java.util.concurrent.Executors | |
import java.util.concurrent.ScheduledExecutorService | |
import scala.concurrent.duration.* | |
object Example extends IOApp.Simple: | |
val subConfig: Config = | |
HelidonKafkaConnector | |
.configBuilder() | |
.bootstrapServers("localhost:9092") | |
.groupId("example-group-id") | |
.topic("example-topic") | |
.batchSize(500) | |
.enableAutoCommit(false) | |
.autoOffsetReset(AutoOffsetReset.EARLIEST) | |
.acks("1") | |
.keyDeserializer(classOf[StringDeserializer]) | |
.valueDeserializer(classOf[StringDeserializer]) | |
.build() | |
def mkScheduler(): Resource[IO, ScheduledExecutorService] = | |
val create = | |
IO.delay: | |
Executors.newSingleThreadScheduledExecutor: r => | |
val t = new Thread(r) | |
t.setName("io-scheduler") | |
t.setDaemon(true) | |
t.setPriority(Thread.MAX_PRIORITY) | |
t | |
val cleanup = (executor: ScheduledExecutorService) => IO.delay(executor.shutdown()) | |
Resource.make(create)(cleanup) | |
def mkHelidonKafkaConsumer(scheduler: ScheduledExecutorService): Resource[IO, HelidonKafkaConsumer[String, String]] = | |
val create = | |
IO.delay: | |
HelidonKafkaConsumer | |
.builder[String, String]() | |
.scheduler(scheduler) | |
.config(subConfig) | |
.build() | |
val cleanup = (consumer: HelidonKafkaConsumer[String, String]) => IO.delay(consumer.stop()) | |
Resource.make(create)(cleanup) | |
val consumerStream: Stream[IO, KafkaMessage[String, String]] = | |
Stream | |
.resource(mkScheduler().flatMap(mkHelidonKafkaConsumer)) | |
.flatMap: rsConsumer => | |
// Setting a chunk size of 1 will allow us to stay up to date with the latest message on Kafka | |
// Setting it higher will result in delayed processing of messages since the implementation will wait for the entire chunk to be filled up before emitting it on the Stream | |
fromPublisher(chunkSize = 100): fs2Subscriber => | |
val flowConsumer = FlowAdapters.toFlowPublisher(rsConsumer) | |
IO.delay(flowConsumer.subscribe(fs2Subscriber)) | |
override val run = | |
consumerStream | |
.prefetchN(2) | |
.chunks | |
// .prefetchN(32) | |
// .groupWithin(512, 5.seconds) | |
.evalMap: (messages: Chunk[KafkaMessage[String, String]]) => | |
for | |
start <- IO.monotonic | |
_ <- messages.parTraverse(message => IO.fromCompletionStage(IO.delay(message.ack()))) | |
end <- IO.monotonic | |
_ <- IO.println(s"Processed chunk of size ${messages.size} in ${(end - start).toMillis}ms") | |
yield () | |
.compile | |
.drain |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment