Skip to content

Instantly share code, notes, and snippets.

@notxcain
Last active May 11, 2018 11:45
Show Gist options
  • Select an option

  • Save notxcain/10ef31f2d3ffba8a532af8e0395afeb0 to your computer and use it in GitHub Desktop.

Select an option

Save notxcain/10ef31f2d3ffba8a532af8e0395afeb0 to your computer and use it in GitHub Desktop.
Fs2 Kafka Consumer
import java.util
import java.util.{Collections, UUID}
import cats.effect.{Async, Timer}
import io.evotor.webhooks.common.Committable
import fs2._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.Deserializer
import scala.concurrent.blocking
import scala.collection.JavaConverters._
import scala.concurrent.duration.{FiniteDuration, _}
import scala.util.Try
object KafkaStream {
final class Builder[F[_]] {
def apply[K, V](bootstrapServers: Set[String],
groupId: String,
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V],
properties: Map[String, String] = Map.empty,
pollInterval: FiniteDuration = 50.millis)(
implicit F: Async[F],
timer: Timer[F]): KafkaStream[F, K, V] = {
val defaults = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers
.mkString(","),
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.CLIENT_ID_CONFIG -> UUID.randomUUID().toString,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
)
new KafkaStream(defaults ++ properties,
keyDeserializer,
valueDeserializer,
pollInterval)
}
}
def apply[F[_]]: Builder[F] = new Builder[F]
}
final class KafkaStream[F[_], K, V] private (
properties: Map[String, String],
keyDeserializer: Deserializer[K],
valueDeserializer: Deserializer[V],
pollInterval: FiniteDuration)(implicit F: Async[F], timer: Timer[F]) {
private val javaProps = properties.foldLeft(new java.util.Properties) {
case (p, (k, v)) => p.put(k, v); p
}
private val acquire: F[KafkaConsumer[K, V]] = F.suspend {
F.fromTry {
blocking {
Try {
new KafkaConsumer[K, V](
javaProps,
keyDeserializer,
valueDeserializer
)
}
}
}
}
private def release(consumer: KafkaConsumer[K, V]): F[Unit] =
F.suspend {
F.fromTry(Try(blocking(consumer.close())))
}
def subscribe(topic: String): Stream[F, Committable[F, ConsumerRecord[K, V]]] = {
def use(consumer: KafkaConsumer[K, V]): Stream[F, Committable[F, ConsumerRecord[K, V]]] =
Stream.suspend {
consumer.subscribe(Collections.singleton(topic))
val pollOnce = Stream
.eval(F.delay {
blocking(consumer.poll(0L)).iterator().asScala
})
.flatMap { records =>
Stream.fromIterator(records)
}
.map { record =>
val commit = F.async[Unit] { cb =>
val tp = new TopicPartition(record.topic(), record.partition())
val om = new OffsetAndMetadata(record.offset())
consumer.commitAsync(
Collections.singletonMap(tp, om),
new OffsetCommitCallback {
override def onComplete(
offsets: util.Map[TopicPartition, OffsetAndMetadata],
exception: Exception): Unit =
if (exception eq null) {
cb(Right(()))
} else {
cb(Left(exception))
}
}
)
}
Committable(commit, record)
}
def pollLoop: Stream[F, Committable[F, ConsumerRecord[K, V]]] =
pollOnce ++ Stream.eval(timer.sleep(pollInterval)).drain ++ pollLoop
pollLoop
}
Stream.bracket(acquire)(use, release)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment