Skip to content

Instantly share code, notes, and snippets.

@Amerousful
Last active May 27, 2023 14:57
Show Gist options
  • Save Amerousful/1f079c96f87356da067c79bdcfa19628 to your computer and use it in GitHub Desktop.
Save Amerousful/1f079c96f87356da067c79bdcfa19628 to your computer and use it in GitHub Desktop.
Kafka | Producer and Consumer with update `value`
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.time.Duration
import scala.jdk.CollectionConverters._
object ConsumerExample extends App {
import java.util.Properties
val READ_TOPIC = "input_topic"
val WRITE_TOPIC = "output_topic"
val c_props = new Properties()
c_props.put("bootstrap.servers", "localhost:29092")
c_props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
c_props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
c_props.put("group.id", "external-computer")
val p_props = new Properties()
p_props.put("bootstrap.servers", "localhost:29092")
p_props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
p_props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val consumer = new KafkaConsumer[String, String](c_props)
val producer = new KafkaProducer[String, String](p_props)
consumer.subscribe(java.util.Collections.singletonList(READ_TOPIC))
while (true) {
val records = consumer.poll(Duration.ofMillis(100))
for (record <- records.asScala) {
println(s"Read record ${record.key()} :: ${record.value()}")
val key = record.key()
val value = record.value() + "_new"
println(s"Write record $key :: $value")
val newRecord = new ProducerRecord(WRITE_TOPIC, key, value)
producer.send(newRecord)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment