Skip to content

Instantly share code, notes, and snippets.

@duanebester
Created December 2, 2018 17:04
Show Gist options
  • Select an option

  • Save duanebester/181244d0a4b68b23c71b47da72008f71 to your computer and use it in GitHub Desktop.

Select an option

Save duanebester/181244d0a4b68b23c71b47da72008f71 to your computer and use it in GitHub Desktop.
Kafka Akka Consumer
import akka.actor.ActorSystem
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}
object ConsumerApp extends App {
implicit val system: ActorSystem = ActorSystem("consumer-sys")
implicit val mat: Materializer = ActorMaterializer()
implicit val ec: ExecutionContextExecutor = system.dispatcher
val config = ConfigFactory.load()
val consumerConfig = config.getConfig("akka.kafka.consumer")
val consumerSettings = ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
val consume = Consumer
.plainSource(consumerSettings, Subscriptions.topics("test"))
.runWith(Sink.foreach(println))
consume onComplete {
case Success(_) => println("Done"); system.terminate()
case Failure(err) => println(err.toString); system.terminate()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment