Last active
March 19, 2018 20:52
-
-
Save francescofrontera/c1a52d20d4bf20afee84bdf78f82bf14 to your computer and use it in GitHub Desktop.
Type Class for Serialize/Deserialize Avro generated Class using SpecificDatumReader.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.ByteArrayOutputStream | |
import java.nio.ByteBuffer | |
import io.confluent.connect.avro.{AvroAuthentication, AvroPrimaryBrandCreated} | |
import org.apache.avro.io.{BinaryEncoder, EncoderFactory} | |
import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter} | |
import scala.reflect.ClassTag | |
abstract class GenDeserializerOps[T <: SpecificRecordBase: ClassTag] extends Serializable { | |
@transient | |
lazy val (reader, writer) = { | |
val derivedType = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] | |
(new SpecificDatumReader[T](derivedType), new SpecificDatumWriter[T](derivedType)) | |
} | |
def des(bytes: Array[Byte]): T = { | |
val decoder = DecoderFactory.get.binaryDecoder(buffer.array, null.asInstanceOf[BinaryDecoder]) | |
reader.read(null.asInstanceOf[T], decoder) | |
} | |
def ser(value: T): Array[Byte] = { | |
val baos: ByteArrayOutputStream = new ByteArrayOutputStream() | |
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(baos, null) | |
writer.write(value, encoder) | |
enc.flush() | |
baos.toByteArray | |
} | |
} | |
object GenDeserializerOps { | |
/* Avro classes */ | |
implicit case object Auth extends GenDeserializerOps[AvroAuthentication] | |
implicit case object PrimaryBrand extends GenDeserializerOps[AvroPrimaryBrandCreated] | |
@inline | |
def apply[T <: SpecificRecordBase: GenDeserializerOps]: GenDeserializerOps[T] = | |
implicitly[GenDeserializerOps[T]] | |
@inline | |
def deserializeData[T <: SpecificRecordBase: GenDeserializerOps](byte: Array[Byte]): T = | |
apply[T].des(byte) | |
@inline | |
def serializeData[T <: SpecificRecordBase: GenDeserializerOps](value: T): Array[Byte] = | |
apply[T].ser(value) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment