There are 4 possible serialization format when using avro:
- Avro Json encoding
- Avro Data Serialization (https://avro.apache.org/docs/current/spec.html#Data+Serialization) Binary format with an header that contains the full schema, this is the format usually used when writing Avro files
- Avro Single Object Encoding (https://avro.apache.org/docs/current/spec.html#single_object_encoding) Binary format with an header with only the fingerprint/id of the schema, this it the format used by Kafka (see this
- Avro Binary Encoding (https://avro.apache.org/docs/current/spec.html#binary_encoding) Binary format without the header, the most compact format
Serialization using the avro4s library, that have the feature to generate a schema and a record (GenericRecord) given a case class.
Add library: libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "1.8.3"
Example of Avro Data Serialization:
import java.io.ByteArrayOutputStream
import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream}
object SampleAvro4sData {
case class User(name: String, favorite_number: Int,favorite_color: String )
def main(args: Array[String]): Unit = {
val outputStream = new ByteArrayOutputStream()
val os = AvroOutputStream.data[User](outputStream)
os.write(Seq(
User("davide", 6, "red"),
User("mark", 4, "white")))
os.flush()
os.close()
val bytes = outputStream.toByteArray
val is = AvroInputStream.data[User](bytes)
val users = is.iterator.toSet
is.close()
println("len: " + bytes.length)
println(users.mkString("\n"))
}
}
To use Avro Binary Encoding just change AvroOutputStream.data
to AvroOutputStream.binary
.
Serialization using the official java library.
Add library: libraryDependencies += "org.apache.avro" % "avro" % "1.7.7"
Example of Avro Data Serialization and Binary Encoding.
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import scala.collection.mutable
object SampleAvro {
def main(args: Array[String]): Unit = {
val schema =
"""
|{"namespace": "example.avro",
| "type": "record",
| "name": "User",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": "int"},
| {"name": "favorite_color", "type": "string"}
| ]
|}
""".stripMargin
import org.apache.avro.generic.GenericData
val schemaObj = new org.apache.avro.Schema.Parser().parse(schema)
val user1 = new GenericData.Record(schemaObj)
user1.put("name", "Alyssa")
user1.put("favorite_number", 256)
user1.put("favorite_color", "blue")
val user2 = new GenericData.Record(schemaObj)
user2.put("name", "Ben")
user2.put("favorite_number", 7)
user2.put("favorite_color", "red")
// Data serialization (data + schema)
val bytes = write(List(user1, user2), schemaObj)
val users = read(bytes, schemaObj)
println("Data serialization")
println(users.mkString("\n"))
// Binary encoding only (only data without schema)
val bytes2 = writeBinary(List(user1, user2), schemaObj)
val users2 = readBinary(bytes2, schemaObj)
println("Binary encoding")
println(users2.mkString("\n"))
}
def write(records: Seq[org.apache.avro.generic.GenericData.Record],
schema: org.apache.avro.Schema): Array[Byte] = {
import java.io.ByteArrayOutputStream
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
val outputStream = new ByteArrayOutputStream()
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
dataFileWriter.create(schema, outputStream)
for (record <- records)
dataFileWriter.append(record)
dataFileWriter.flush()
dataFileWriter.close()
outputStream.toByteArray
}
def read(bytes: Array[Byte],
schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
val datumReader = new GenericDatumReader[GenericRecord](schema)
val inputStream = new SeekableByteArrayInput(bytes)
val dataFileReader = new DataFileReader[GenericRecord](inputStream, datumReader)
import scala.collection.JavaConverters._
val list = dataFileReader.iterator().asScala.toList
dataFileReader.close()
list
}
def writeBinary(records: Seq[org.apache.avro.generic.GenericData.Record],
schema: org.apache.avro.Schema): Array[Byte] = {
import java.io.ByteArrayOutputStream
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
val outputStream = new ByteArrayOutputStream()
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
val encoder = EncoderFactory.get.binaryEncoder(outputStream, null)
for (record <- records)
datumWriter.write(record, encoder)
encoder.flush()
outputStream.toByteArray
}
def readBinary(bytes: Array[Byte],
schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
val datumReader = new GenericDatumReader[GenericRecord](schema)
val inputStream = new SeekableByteArrayInput(bytes)
val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)
val result = new mutable.MutableList[org.apache.avro.generic.GenericRecord]
while (!decoder.isEnd) {
val item = datumReader.read(null, decoder)
result += item
}
result.toList
}
}
The same can also be performed using specific class instead of using GenericRecord
. One way is to use ReflectDatumWriter/ReflectDatumReader
instead of GenericDatumWriter/GenericDatumReader
.
Rules that must be used to ensure correct schema evolution: https://avro.apache.org/docs/current/spec.html#Schema+Resolution
Note that when reading a binary avro you should always provide the original schema used to write it. It can be provided as an header (see data serialization) or from some where else. If you want to read data to a new schema (a new class) you should provide the old and the new schema. The old schema is used to read the binary data, the new schema is used to map old fields to new fields (following the above rules).
Thanks