Skip to content

Instantly share code, notes, and snippets.

@ElisaBaum
Last active August 27, 2020 08:06
Show Gist options
  • Save ElisaBaum/c9ec24ea044393b7ab9494219770f213 to your computer and use it in GitHub Desktop.
Save ElisaBaum/c9ec24ea044393b7ab9494219770f213 to your computer and use it in GitHub Desktop.
Kafka Producer/Consumer Example in Scala
name := "KafkaExample"
version := "1.0"
scalaVersion := "2.12.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.0"
import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer
class Consumer(brokers: String, topic: String, groupId: String) {
val consumer = new KafkaConsumer[String, String](configuration)
consumer.subscribe(List(topic).asJava)
private def configuration: Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props
}
def receiveMessages(): Unit = {
while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(1000)
records.asScala.foreach(record => println(s"Received message: $record"))
}
}
}
object Consumer extends App {
val consumer = new Consumer(brokers = args(0), topic = args(1), groupId = args(2))
consumer.receiveMessages()
}
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.StdIn
class Producer(topic: String, brokers: String) {
val producer = new KafkaProducer[String, String](configuration)
private def configuration: Properties = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props
}
def sendMessages(): Unit = {
println("Enter message (type exit to quit)")
var message = StdIn.readLine()
while (! message.equals("exit")) {
val record = new ProducerRecord[String, String](topic, "1", message)
producer.send(record)
message = StdIn.readLine()
}
producer.close()
}
}
object Producer extends App {
val producer = new Producer(brokers = args(0), topic = args(1))
producer.sendMessages()
}
@hosnids
Copy link

hosnids commented Mar 21, 2019

how to run it ? Thanks.

@nsuthison
Copy link

nsuthison commented Feb 26, 2020

how to run it ? Thanks.

sbt "run {broker} {topic} {groupId (for consumer)}"

note: Data in {..} is arguments that you need to pass when starting the process

@joshuaalbertgit
Copy link

Hello ElisaBaum, I was looking for such a simple and good working example, it really helped a lot.
Greatly appreciated your post/share here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment