Skip to content

Instantly share code, notes, and snippets.

@Amerousful
Created November 20, 2023 21:09
Show Gist options
  • Save Amerousful/b4e696d0428b940cd74c19d23c3dfa2e to your computer and use it in GitHub Desktop.
Save Amerousful/b4e696d0428b940cd74c19d23c3dfa2e to your computer and use it in GitHub Desktop.
// <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
// <dependency>
// <groupId>org.apache.kafka</groupId>
// <artifactId>kafka-clients</artifactId>
// <version>3.3.2</version>
// </dependency>
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import java.time.Duration
import scala.jdk.CollectionConverters._
object ConsumerProducerExample extends App {
import java.util.Properties
val READ_TOPIC = "input_topic"
val WRITE_TOPIC = "output_topic"
val c_props = new Properties()
c_props.put("bootstrap.servers", "localhost:29092")
c_props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
c_props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
c_props.put("group.id", "external-computer")
val p_props = new Properties()
p_props.put("bootstrap.servers", "localhost:29092")
p_props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
p_props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val consumer = new KafkaConsumer[String, String](c_props)
val producer = new KafkaProducer[String, String](p_props)
consumer.subscribe(java.util.Collections.singletonList(READ_TOPIC))
while (true) {
val records = consumer.poll(Duration.ofMillis(100))
for (record <- records.asScala) {
println(s"Read record ${record.key()} :: ${record.value()}")
val key = record.key()
val value = record.value()
val newRecord = new ProducerRecord(WRITE_TOPIC, key, value)
producer.send(newRecord)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment