Skip to content

Instantly share code, notes, and snippets.

@WadeWaldron
Last active March 18, 2016 19:04
Show Gist options
  • Save WadeWaldron/d05345b675a57665bf44 to your computer and use it in GitHub Desktop.
Save WadeWaldron/d05345b675a57665bf44 to your computer and use it in GitHub Desktop.
KafkaConsumers-Java
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import collection.JavaConverters._
val pollingTimeoutInMS = 100
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("group.id", "JavaConsumer")
props.put("enable.auto.commit", "false")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List("messages").asJava)
while(notDone()) {
consumer.poll(pollingTimeoutInMS).iterator().asScala.foreach { record =>
println(record.value())
}
consumer.commitSync()
}
consumer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment