Last active
February 3, 2026 20:24
-
-
Save dacr/789efd4efa2ac92df4b327f7bfccd2d6 to your computer and use it in GitHub Desktop.
Kafka simple usage using java API / published by https://github.com/dacr/code-examples-manager #632dd41b-82c8-445c-b3bd-8f1d54280df0/a7910ad08a5db1fe42756cdd915171e587082835
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // summary : Kafka simple usage using java API | |
| // keywords : scala, scalatest, kafka | |
| // publish : gist | |
| // authors : David Crosson | |
| // license : Apache License Version 2.0 (https://www.apache.org/licenses/LICENSE-2.0.txt) | |
| // id : 632dd41b-82c8-445c-b3bd-8f1d54280df0 | |
| // created-on : 2020-01-10T16:35:02Z | |
| // managed-by : https://github.com/dacr/code-examples-manager | |
| // execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc' | |
| import $ivy.`org.apache.kafka:kafka-clients:2.8.0` | |
| import $ivy.`org.json4s::json4s-jackson:3.6.11` | |
| import $ivy.`org.json4s::json4s-ext:3.6.11` | |
| import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} | |
| import org.apache.kafka.common.serialization.StringDeserializer | |
| import java.time.Duration | |
| import scala.jdk.CollectionConverters._ | |
| import org.json4s.DefaultFormats | |
| import org.json4s.jackson.JsonMethods.{parse, pretty, render} | |
| implicit val formats = DefaultFormats.lossless | |
| val consumer = { | |
| import ConsumerConfig._ | |
| val props = new java.util.Properties() | |
| props.setProperty(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092") | |
| props.setProperty(GROUP_ID_CONFIG, "default") | |
| props.setProperty(AUTO_OFFSET_RESET_CONFIG, "earliest") // if first restart then rewind from the beginning | |
| props.setProperty(ENABLE_AUTO_COMMIT_CONFIG, "true") | |
| props.setProperty(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") | |
| props.put(KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) | |
| props.put(VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) | |
| new KafkaConsumer[String, String](props) | |
| } | |
| consumer.subscribe(List("test").asJava) | |
| while ( true ) { | |
| val records = consumer.poll(Duration.ofMillis(100)).asScala | |
| for (record <- records) { | |
| printf("offset = %d, key = %s, value = %s%n", record.offset, record.key, record.value) | |
| val json = parse(record.value) | |
| println(pretty(render(json))) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment