Last active
November 4, 2019 13:55
-
-
Save kailuowang/fc9974f66113e2362dac to your computer and use it in GitHub Desktop.
Avro deserializer for Flink's Data Stream API Kafka Source
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.avro.io.DatumReader | |
import org.apache.avro.io.DecoderFactory | |
import org.apache.avro.specific.SpecificDatumReader | |
import org.apache.flink.api.common.typeinfo.TypeInformation | |
import org.apache.flink.api.java.typeutils.TypeExtractor | |
class AvroDeserializationSchema[T <: SpecificRecordBase: ClassTag](private val baselineMessage: () ⇒ T) extends DeserializationSchema[T] { | |
override def deserialize(message: Array[Byte]): T = { | |
val bm = baselineMessage() | |
val dw: DatumReader[T] = new SpecificDatumReader(bm.getSchema) | |
dw.read(bm, DecoderFactory.get.binaryDecoder(message, null)) | |
} | |
override def isEndOfStream(nextElement: T): Boolean = false | |
override def getProducedType(): TypeInformation[T] = { | |
TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment