Last active
October 17, 2022 04:16
-
-
Save hammer/76996fb8426a0ada233e to your computer and use it in GitHub Desktop.
Concise example of how to write an Avro record out as JSON in Scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{IOException, File, ByteArrayOutputStream} | |
import org.apache.avro.file.{DataFileReader, DataFileWriter} | |
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder} | |
import org.apache.avro.io.EncoderFactory | |
import org.apache.avro.SchemaBuilder | |
import org.apache.hadoop.fs.Path | |
import parquet.avro.{AvroParquetReader, AvroParquetWriter} | |
import scala.util.control.Breaks.break | |
object HelloAvro { | |
def main(args: Array[String]) { | |
// Build a schema | |
val schema = SchemaBuilder | |
.record("person") | |
.fields | |
.name("name").`type`().stringType().noDefault() | |
.name("ID").`type`().intType().noDefault() | |
.endRecord | |
// Build an object conforming to the schema | |
val user1 = new GenericRecordBuilder(schema) | |
.set("name", "Jeff") | |
.set("ID", 1) | |
.build | |
// JSON encoding of the object (a single record) | |
val writer = new GenericDatumWriter[GenericRecord](schema) | |
val baos = new ByteArrayOutputStream | |
val jsonEncoder = EncoderFactory.get.jsonEncoder(schema, baos) | |
writer.write(user1, jsonEncoder) | |
jsonEncoder.flush | |
println("JSON encoded record: " + baos) | |
// binary encoding of the object (a single record) | |
baos.reset | |
val binaryEncoder = EncoderFactory.get.binaryEncoder(baos, null) | |
writer.write(user1, binaryEncoder) | |
binaryEncoder.flush | |
println("Binary encoded record: " + baos.toByteArray) | |
// Build another object conforming to the schema | |
val user2 = new GenericRecordBuilder(schema) | |
.set("name", "Sam") | |
.set("ID", 2) | |
.build | |
// Write both records to an Avro object container file | |
val file = new File("users.avro") | |
file.deleteOnExit | |
val dataFileWriter = new DataFileWriter[GenericRecord](writer) | |
dataFileWriter.create(schema, file) | |
dataFileWriter.append(user1) | |
dataFileWriter.append(user2) | |
dataFileWriter.close | |
// Read the records back from the file | |
val datumReader = new GenericDatumReader[GenericRecord](schema) | |
val dataFileReader = new DataFileReader[GenericRecord](file, datumReader) | |
var user: GenericRecord = null; | |
while (dataFileReader.hasNext) { | |
user = dataFileReader.next(user) | |
println("Read user from Avro file: " + user) | |
} | |
// Write both records to a Parquet file | |
val tmp = File.createTempFile(getClass.getSimpleName, ".tmp") | |
tmp.deleteOnExit | |
tmp.delete | |
val tmpParquetFile = new Path(tmp.getPath) | |
val parquetWriter = new AvroParquetWriter[GenericRecord](tmpParquetFile, schema) | |
parquetWriter.write(user1) | |
parquetWriter.write(user2) | |
parquetWriter.close | |
// Read both records back from the Parquet file | |
val parquetReader = new AvroParquetReader[GenericRecord](tmpParquetFile) | |
while (true) { | |
Option(parquetReader.read) match { | |
case Some(matchedUser) => println("Read user from Parquet file: " + matchedUser) | |
case None => println("Finished reading Parquet file"); break | |
} | |
} | |
} | |
} |
Thanks for a great example! However, I noticed a small bug at line 39:
println("Binary encoded record: " + baos.toByteArray)
It prints out the memory address of the byte array. I suggest the following fix:
println("Binary encoded record (hex): " + baos.toByteArray.map(x => f"$x%02x").mkString(" "))
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for this piece of code ! i'm new to scala so this is really useful. Could you please provide the avro schema and the json output please, the avro scehma based on which you created the schema val