Skip to content

Instantly share code, notes, and snippets.

@nsuthison
Forked from ElisaBaum/Consumer.scala
Last active February 24, 2020 17:59
Show Gist options
  • Save nsuthison/d2fa5c5bd67c36afa6658fe253ee7370 to your computer and use it in GitHub Desktop.
Save nsuthison/d2fa5c5bd67c36afa6658fe253ee7370 to your computer and use it in GitHub Desktop.
Kafka Producer/Consumer Example in Scala (Revision to update deprecate/outdate kafka/scala version)
name := "KafkaExample"
version := "1.0"
scalaVersion := "2.13.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.4.0"
import java.util.Properties
import java.time.Duration
import scala.jdk.CollectionConverters._
import org.apache.kafka.clients.consumer.{
ConsumerConfig,
ConsumerRecords,
KafkaConsumer
}
import org.apache.kafka.common.serialization.StringDeserializer
class Consumer(brokers: String, topic: String, groupId: String) {
val consumer = new KafkaConsumer[String, String](configuration)
consumer.subscribe(List(topic).asJava)
private def configuration: Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props
}
def receiveMessages(): Unit = {
while (true) {
val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(1000)))
records.asScala.foreach(record => println(s"Received message: $record"))
}
}
}
object Consumer extends App {
val consumer = new Consumer(brokers = args(0), topic = args(1), groupId = args(2))
consumer.receiveMessages()
}
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.StdIn
class Producer(topic: String, brokers: String) {
val producer = new KafkaProducer[String, String](configuration)
private def configuration: Properties = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props
}
def sendMessages(): Unit = {
println("Enter message (type exit to quit)")
var message = StdIn.readLine()
while (! message.equals("exit")) {
val record = new ProducerRecord[String, String](topic, "1", message)
producer.send(record)
message = StdIn.readLine()
}
producer.close()
}
}
object Producer extends App {
val producer = new Producer(brokers = args(0), topic = args(1))
producer.sendMessages()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment