Created
April 15, 2023 08:25
-
-
Save ahoy-jon/7579571847f2ddf5b9147329570527ef to your computer and use it in GitHub Desktop.
Read - write Kafka - Sans protections
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version := "0.1.0-SNAPSHOT" | |
scalaVersion := "2.13.10" | |
name := "bunnyKafka" | |
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "3.4.0" | |
val value = "7.3.3" | |
libraryDependencies ++= Seq( | |
"io.confluent" % "kafka-avro-serializer" % value, | |
"io.confluent" % "kafka-schema-registry-client" % value | |
) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} | |
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} | |
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig | |
import org.apache.avro.generic.GenericRecord | |
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} | |
import java.util.Properties | |
import scala.jdk.CollectionConverters._ | |
object AvroGenericConsumer { | |
def main(args: Array[String]): Unit = { | |
val topic = "SMC" | |
val props: Properties = { | |
val schemaRegistryUrl = "http://localhost:8081" | |
val props: Properties = new Properties() | |
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") | |
props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-generic-consumer-group4") | |
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) | |
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[io.confluent.kafka.serializers.KafkaAvroDeserializer]) | |
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") | |
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl) | |
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) | |
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) | |
props | |
} | |
val consumer = new KafkaConsumer[String, GenericRecord](props) | |
consumer.subscribe(java.util.Collections.singletonList(topic)) | |
val producer = new KafkaProducer[String, String](props) | |
while (true) { | |
val records = consumer.poll(java.time.Duration.ofMillis(100)).asScala | |
for (record <- records) { | |
val value = record.value() | |
try { | |
val transaction = value.get("transaction").asInstanceOf[Long] | |
val id = value.get("id").asInstanceOf[Long] | |
println(s"Key: ${record.key()}, id: $id, transaction: $transaction, Partition: ${record.partition()}, Offset: ${record.offset()}") | |
producer.send(new ProducerRecord[String, String]("SMC2", | |
s"Key: ${record.key()}, id: $id, transaction: $transaction, Partition: ${record.partition()}, Offset: ${record.offset()}")) | |
} catch { | |
case e: Exception => | |
println(s"Key: ${record.key()}, Value: $value, Partition: ${record.partition()}, Offset: ${record.offset()}") | |
} | |
//send to kafka | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment