Last active
September 5, 2024 15:42
-
-
Save dacr/789efd4efa2ac92df4b327f7bfccd2d6 to your computer and use it in GitHub Desktop.
Kafka simple usage using java API / published by https://github.com/dacr/code-examples-manager #632dd41b-82c8-445c-b3bd-8f1d54280df0/54afddc723ec45a606589cf474d53c6f147cac9a
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// summary : Kafka simple usage using java API | |
// keywords : scala, scalatest, kafka | |
// publish : gist | |
// authors : David Crosson | |
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
// id : 632dd41b-82c8-445c-b3bd-8f1d54280df0 | |
// created-on : 2020-01-10T16:35:02Z | |
// managed-by : https://github.com/dacr/code-examples-manager | |
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc' | |
import $ivy.`org.apache.kafka:kafka-clients:2.8.0` | |
import $ivy.`org.json4s::json4s-jackson:3.6.11` | |
import $ivy.`org.json4s::json4s-ext:3.6.11` | |
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import java.time.Duration | |
import scala.jdk.CollectionConverters._ | |
import org.json4s.DefaultFormats | |
import org.json4s.jackson.JsonMethods.{parse, pretty, render} | |
implicit val formats = DefaultFormats.lossless | |
val consumer = { | |
import ConsumerConfig._ | |
val props = new java.util.Properties() | |
props.setProperty(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092") | |
props.setProperty(GROUP_ID_CONFIG, "default") | |
props.setProperty(AUTO_OFFSET_RESET_CONFIG, "earliest") // if first restart then rewind from the beginning | |
props.setProperty(ENABLE_AUTO_COMMIT_CONFIG, "true") | |
props.setProperty(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") | |
props.put(KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) | |
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) | |
new KafkaConsumer[String, String](props) | |
} | |
consumer.subscribe(List("test").asJava) | |
while ( true ) { | |
val records = consumer.poll(Duration.ofMillis(100)).asScala | |
for (record <- records) { | |
printf("offset = %d, key = %s, value = %s%n", record.offset, record.key, record.value) | |
val json = parse(record.value) | |
println(pretty(render(json))) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment