Created
October 5, 2016 21:43
-
-
Save eliaslevy/155c72b089b14d54a36038d18272f04c to your computer and use it in GitHub Desktop.
Avro SpecificRecord Serde for Kafka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example.kstreams | |
import org.apache.kafka.common.serialization.Serde | |
import org.apache.kafka.common.serialization.Serializer | |
import org.apache.kafka.common.serialization.Deserializer | |
import java.io.ByteArrayInputStream | |
import java.io.ByteArrayOutputStream | |
import org.apache.avro.io.EncoderFactory | |
import org.apache.avro.io.DecoderFactory | |
import org.apache.avro.specific.SpecificDatumWriter | |
import org.apache.avro.specific.SpecificDatumReader | |
import org.apache.avro.specific.SpecificRecord | |
import org.apache.avro.Schema | |
import java.util.{Map => JMap} | |
class SpecificAvroSerde[T <: SpecificRecord](schema: Schema) | |
extends Serde[T] with Serializer[T] with Deserializer[T] | |
{ | |
val out = new ByteArrayOutputStream() | |
val encoder = EncoderFactory.get().directBinaryEncoder(out, null) | |
val decoder = DecoderFactory.get().directBinaryDecoder(null,null) | |
var writer = new SpecificDatumWriter[T](schema) | |
var reader = new SpecificDatumReader[T](schema) | |
def configure(configs: JMap[String, _], isKey: Boolean): Unit = {} | |
def serializer() : Serializer[T] = this | |
def deserializer(): Deserializer[T] = this | |
def serialize(topic: String, obj: T): Array[Byte] = { | |
out.reset() | |
writer.write(obj, encoder) | |
out.toByteArray() | |
} | |
def deserialize(topic: String, bytes: Array[Byte]): T = { | |
DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream(bytes), decoder) | |
reader.read(null.asInstanceOf[T], decoder) | |
} | |
def close(): Unit = {} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment