Skip to content

Instantly share code, notes, and snippets.

@vikas-tikoo-zefr
Created November 14, 2018 21:13
Show Gist options
  • Save vikas-tikoo-zefr/bc012a8ad06f1adfc8ffcc49bd4f9e3d to your computer and use it in GitHub Desktop.
Save vikas-tikoo-zefr/bc012a8ad06f1adfc8ffcc49bd4f9e3d to your computer and use it in GitHub Desktop.
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
import org.apache.kafka.common.serialization.Serializer
import java.util.HashMap
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroSerializer
class StringAvroSerializer<T> : Serializer<T> {
private val inner: KafkaAvroSerializer
constructor() {
inner = KafkaAvroSerializer()
}
/**
* For testing purposes only.
*/
internal constructor(client: SchemaRegistryClient) {
inner = KafkaAvroSerializer(client)
}
override fun configure(serializerConfig: Map<String, *>,
isSerializerForRecordKeys: Boolean) {
inner.configure(
withSpecificAvroEnabled(serializerConfig),
isSerializerForRecordKeys)
}
override fun serialize(topic: String, record: T): ByteArray {
return inner.serialize(topic, record)
}
override fun close() {
inner.close()
}
companion object {
fun withSpecificAvroEnabled(config: Map<String, *>?): Map<String, Any> {
val specificAvroEnabledConfig = if (config == null) HashMap() else HashMap<String, Any>(config)
specificAvroEnabledConfig[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
return specificAvroEnabledConfig
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment