Created
November 14, 2018 21:13
-
-
Save vikas-tikoo-zefr/bc012a8ad06f1adfc8ffcc49bd4f9e3d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig | |
import org.apache.kafka.common.serialization.Serializer | |
import java.util.HashMap | |
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient | |
import io.confluent.kafka.serializers.KafkaAvroSerializer | |
class StringAvroSerializer<T> : Serializer<T> { | |
private val inner: KafkaAvroSerializer | |
constructor() { | |
inner = KafkaAvroSerializer() | |
} | |
/** | |
* For testing purposes only. | |
*/ | |
internal constructor(client: SchemaRegistryClient) { | |
inner = KafkaAvroSerializer(client) | |
} | |
override fun configure(serializerConfig: Map<String, *>, | |
isSerializerForRecordKeys: Boolean) { | |
inner.configure( | |
withSpecificAvroEnabled(serializerConfig), | |
isSerializerForRecordKeys) | |
} | |
override fun serialize(topic: String, record: T): ByteArray { | |
return inner.serialize(topic, record) | |
} | |
override fun close() { | |
inner.close() | |
} | |
companion object { | |
fun withSpecificAvroEnabled(config: Map<String, *>?): Map<String, Any> { | |
val specificAvroEnabledConfig = if (config == null) HashMap() else HashMap<String, Any>(config) | |
specificAvroEnabledConfig[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true | |
return specificAvroEnabledConfig | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment