Skip to content

Instantly share code, notes, and snippets.

@duanebester
Created December 2, 2018 17:05
Show Gist options
  • Save duanebester/25d076b901a7fa92d87f4bfe88f290e6 to your computer and use it in GitHub Desktop.
Save duanebester/25d076b901a7fa92d87f4bfe88f290e6 to your computer and use it in GitHub Desktop.
Kafka Akka Producer
import akka.Done
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.ConfigFactory
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}
object ProducerApp extends App {
implicit val system: ActorSystem = ActorSystem("producer-sys")
implicit val mat: Materializer = ActorMaterializer()
implicit val ec: ExecutionContextExecutor = system.dispatcher
val config = ConfigFactory.load()
val producerConfig = config.getConfig("akka.kafka.producer")
val producerSettings = ProducerSettings(producerConfig, new StringSerializer, new StringSerializer)
val produce: Future[Done] =
Source(1 to 100)
.map(value => new ProducerRecord[String, String]("test", value.toString))
.runWith(Producer.plainSink(producerSettings))
produce onComplete {
case Success(_) => println("Done"); system.terminate()
case Failure(err) => println(err.toString); system.terminate()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment