Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Created December 11, 2024 20:50
Show Gist options
  • Save calvinlfer/0680bba281d7e4ee361109f87c1d227c to your computer and use it in GitHub Desktop.
Save calvinlfer/0680bba281d7e4ee361109f87c1d227c to your computer and use it in GitHub Desktop.
Integrating the Helidon Kafka client with FS2 Streams using the interop flow package
import cats.syntax.parallel.*
import cats.effect.*
import fs2.*
import fs2.interop.flow.*
import io.helidon.config.Config
import io.helidon.messaging.connectors.kafka.KafkaConfigBuilder.AutoOffsetReset
import io.helidon.messaging.connectors.kafka.{KafkaConnector as HelidonKafkaConnector, KafkaMessage, KafkaPublisher as HelidonKafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer
import org.reactivestreams.FlowAdapters
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.duration.*
object Example extends IOApp.Simple:
val subConfig: Config =
HelidonKafkaConnector
.configBuilder()
.bootstrapServers("localhost:9092")
.groupId("example-group-id")
.topic("example-topic")
.batchSize(500)
.enableAutoCommit(false)
.autoOffsetReset(AutoOffsetReset.EARLIEST)
.acks("1")
.keyDeserializer(classOf[StringDeserializer])
.valueDeserializer(classOf[StringDeserializer])
.build()
def mkScheduler(): Resource[IO, ScheduledExecutorService] =
val create =
IO.delay:
Executors.newSingleThreadScheduledExecutor: r =>
val t = new Thread(r)
t.setName("io-scheduler")
t.setDaemon(true)
t.setPriority(Thread.MAX_PRIORITY)
t
val cleanup = (executor: ScheduledExecutorService) => IO.delay(executor.shutdown())
Resource.make(create)(cleanup)
def mkHelidonKafkaConsumer(scheduler: ScheduledExecutorService): Resource[IO, HelidonKafkaConsumer[String, String]] =
val create =
IO.delay:
HelidonKafkaConsumer
.builder[String, String]()
.scheduler(scheduler)
.config(subConfig)
.build()
val cleanup = (consumer: HelidonKafkaConsumer[String, String]) => IO.delay(consumer.stop())
Resource.make(create)(cleanup)
val consumerStream: Stream[IO, KafkaMessage[String, String]] =
Stream
.resource(mkScheduler().flatMap(mkHelidonKafkaConsumer))
.flatMap: rsConsumer =>
// Setting a chunk size of 1 will allow us to stay up to date with the latest message on Kafka
// Setting it higher will result in delayed processing of messages since the implementation will wait for the entire chunk to be filled up before emitting it on the Stream
fromPublisher(chunkSize = 100): fs2Subscriber =>
val flowConsumer = FlowAdapters.toFlowPublisher(rsConsumer)
IO.delay(flowConsumer.subscribe(fs2Subscriber))
override val run =
consumerStream
.prefetchN(2)
.chunks
// .prefetchN(32)
// .groupWithin(512, 5.seconds)
.evalMap: (messages: Chunk[KafkaMessage[String, String]]) =>
for
start <- IO.monotonic
_ <- messages.parTraverse(message => IO.fromCompletionStage(IO.delay(message.ack())))
end <- IO.monotonic
_ <- IO.println(s"Processed chunk of size ${messages.size} in ${(end - start).toMillis}ms")
yield ()
.compile
.drain
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment