Skip to content

Instantly share code, notes, and snippets.

@dosht
Last active July 7, 2020 03:21
Show Gist options
  • Save dosht/88bccd40d4003d217d8081a58e327ea9 to your computer and use it in GitHub Desktop.
Save dosht/88bccd40d4003d217d8081a58e327ea9 to your computer and use it in GitHub Desktop.
def consumeFromKafka(topic: String) = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9094")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
props.put("group.id", "consumer-group")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
while (true) {
val record = consumer.poll(1000).asScala
for (data <- record.iterator)
println(data.value())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment