Skip to content

Instantly share code, notes, and snippets.

@dsebban
Created August 20, 2018 08:35
Show Gist options
  • Save dsebban/46b5309225dd088b30d7d3f324c8e8f8 to your computer and use it in GitHub Desktop.
Save dsebban/46b5309225dd088b30d7d3f324c8e8f8 to your computer and use it in GitHub Desktop.
import scala.concurrent.duration._
import $ivy.`com.spinoco:fs2-kafka_2.12:0.4.0-M2`
import spinoco.protocol.kafka.{Broker, PartitionId, ProtocolVersion, TopicName}
import spinoco.fs2.kafka
import spinoco.fs2.kafka._
import cats.effect.IO
object Fs2KafkaClientResources {
import java.util.concurrent.Executors
import java.nio.channels.AsynchronousChannelGroup
import scala.concurrent.ExecutionContext
import cats.effect.{Concurrent, Timer}
import spinoco.protocol.kafka.{Broker, PartitionId, ProtocolVersion, TopicName}
implicit val _timer: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val _concurrent: Concurrent[IO] = IO.ioConcurrentEffect(_timer)
implicit val AG: AsynchronousChannelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(8))
implicit lazy val logger: Logger[IO] = new Logger[IO] {
def log(level: Logger.Level.Value, msg: => String, throwable: Throwable): IO[Unit] =
IO { println(s"LOGGER: $level: $msg"); if (throwable != null) throwable.printStackTrace() }
}
}
import Fs2KafkaClientResources._
val kafkaTopicName = topic("testbp_xtoxt")
val part0 = partition(0)
kafka.client(
ensemble = Set(broker("localhost", port = 9092))
, protocol = ProtocolVersion.Kafka_0_10
, clientName = "1"
).flatMap { kc =>
kc.subscribe(kafkaTopicName, part0, HeadOffset)
}.take(1).compile.toList.unsafeRunSync.foreach(println)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment