Skip to content

Instantly share code, notes, and snippets.

@WadeWaldron
Last active March 18, 2016 19:30
Show Gist options
  • Save WadeWaldron/143b0c6b9ba18a981008 to your computer and use it in GitHub Desktop.
Save WadeWaldron/143b0c6b9ba18a981008 to your computer and use it in GitHub Desktop.
KafkaConsumers-ReactiveKafka
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka}
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.duration._
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val kafka = new ReactiveKafka()
val properties = ConsumerProperties(
bootstrapServers = "localhost:9092",
topic = "messages",
groupId = "ReactiveKafka",
keyDeserializer = new StringDeserializer(),
valueDeserializer = new StringDeserializer()
).commitInterval(5.seconds)
val consumer = kafka.consumeWithOffsetSink(properties)
Source.fromPublisher(consumer.publisher)
.map { record =>
println(record.value())
record
}.runWith(consumer.offsetCommitSink)
waitForCompletion()
system.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment