Created
March 11, 2021 09:08
-
-
Save scf37/856dfcfb429b681e370c288f575b6eb7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io._ | |
/** | |
* Schema for efficient binary serialization of case classes | |
* Implementation must: | |
* - declare all fields using `required` and `optional` methods | |
* - implement doRead, transferring field values from declaration to case class instance | |
* - assign every field an unique tag | |
* | |
* Protocol. | |
* Required fields are sorted by tag and serialized first, without any additional metadata | |
* Optional fields are prefixed by tag number, None is serialized by writing nothing at all. | |
* | |
* Compatibility. | |
* Backward-compatible changes are: | |
* - adding optional fields | |
* - re-arranging field declarations w/o changing tags | |
* Backward-incompatible changes are: | |
* - adding or removing required fields | |
* - removing optional fields | |
* - changing item tags | |
* | |
* @tparam A | |
*/ | |
trait BinarySchema[A] extends BinaryCodec[A] { | |
sealed trait Field[B] { | |
/** | |
* Extract field value for ReadContext during deserialization | |
* @param ctx context | |
* @return field value | |
*/ | |
def apply()(implicit ctx: ReadContext): B | |
} | |
trait ReadContext | |
/** | |
* Declare required field. Required fields consume one byte less during serialization but have restricted compatibility | |
* | |
* @param get value extractor | |
* @param tag tag | |
* @tparam B | |
* @return field | |
*/ | |
def required[B: BinaryCodec](get: A => B, tag: Int): Field[B] = { | |
val f = FieldImpl[B](tag, true, (a, os) => BinaryCodec[B].write(get(a), os), BinaryCodec[B].read) | |
fields = fields :+ f | |
f | |
} | |
/** | |
* Declare optional field. Optional fields consume zero bytes when serializing None but 1 more byte otherwise | |
* | |
* @param get value extractor | |
* @param tag tag | |
* @tparam B | |
* @return field | |
*/ | |
def optional[B: BinaryCodec](get: A => Option[B], tag: Int): Field[Option[B]] = { | |
val f = FieldImpl[Option[B]](tag, false, (a, os) => { | |
get(a).foreach { b => | |
BinaryCodec[Int].write(tag, os) | |
BinaryCodec[B].write(b, os) | |
} | |
}, is => Some(BinaryCodec[B].read(is))) | |
fields = fields :+ f | |
f | |
} | |
/** | |
* Create case class instance from read context | |
* @param ctx context | |
* @return assembled case class | |
*/ | |
protected def doRead(implicit ctx: ReadContext): A | |
private var fields = Vector.empty[FieldImpl[_]] | |
@volatile | |
private var requiredSchema: List[FieldImpl[_]] = null | |
@volatile | |
private var optionalSchema: Map[Int, FieldImpl[_]] = null | |
@volatile | |
private var defaults: Map[Int, Any] = null | |
// process collected fields to speed up further r/w operations | |
private def initSchema(): Unit = { | |
if (requiredSchema == null) { | |
requiredSchema = fields.filter(_.required).sortBy(_.tag).toList | |
optionalSchema = fields.filterNot(_.required).map(f => f.tag -> f).toMap | |
defaults = optionalSchema.values.map(_.tag -> None).toMap | |
if (fields.size != requiredSchema.size + optionalSchema.size) | |
throw new IllegalStateException("Duplicate tags detected") | |
} | |
} | |
private case class FieldImpl[B]( | |
tag: Int, | |
required: Boolean, | |
write: (A, DataOutputStream) => Unit, | |
read: DataInputStream => B | |
) extends Field[B] { | |
override def apply()(implicit ctx: ReadContext): B = ctx.asInstanceOf[ReadContextImpl].data(tag).asInstanceOf[B] | |
} | |
private case class ReadContextImpl(data: Map[Int, Any]) extends ReadContext | |
override def read(is: DataInputStream): A = { | |
initSchema() | |
var data = defaults | |
requiredSchema.foreach { f => | |
data += f.tag -> f.read(is) | |
} | |
while (is.available() > 0) { | |
val tag = BinaryCodec[Int].read(is) | |
data += tag -> optionalSchema. | |
getOrElse(tag, throw new IllegalArgumentException(s"Unknown tag in input stream: $tag")) | |
.read(is) | |
} | |
doRead(ReadContextImpl(data)) | |
} | |
override def write(a: A, os: DataOutputStream): Unit = { | |
initSchema() | |
requiredSchema.foreach { f => | |
f.write(a, os) | |
} | |
optionalSchema.values.foreach { f => | |
f.write(a, os) | |
} | |
} | |
} | |
/** | |
* Binary codec, capable of encoding and decoding A into/from bytes | |
* @tparam A | |
*/ | |
trait BinaryCodec[A] { | |
def read(is: DataInputStream): A | |
def write(a: A, os: DataOutputStream): Unit | |
} | |
object BinaryCodec { | |
def apply[A: BinaryCodec]: BinaryCodec[A] = implicitly | |
private def writeVarint(value: Long, os: OutputStream): Unit = { | |
var v = value | |
while (true) { | |
val block = v & 0x7F | |
v = v >>> 7 | |
if (v != 0) { | |
os.write((block | 0x80).toInt) | |
} else { | |
os.write(block.toInt) | |
return | |
} | |
} | |
} | |
private def readVarint(is: InputStream, maxBits: Int): Long = { | |
var v = 0L | |
var i = 0 | |
while (true) { | |
val b = is.read() | |
if (b == -1) throw new IllegalArgumentException("Unexpected eof when reading varint") | |
val block: Long = b & 0x7F | |
v = v | (block << i) | |
if (i >= maxBits) throw new IllegalArgumentException("Variable length quantity is too long") | |
i += 7 | |
if ((b & 0x80) == 0) { | |
return v | |
} | |
} | |
??? | |
} | |
implicit val byteCodec: BinaryCodec[Byte] = new BinaryCodec[Byte] { | |
override def read(is: DataInputStream): Byte = is.readByte() | |
override def write(a: Byte, os: DataOutputStream): Unit = os.writeByte(a) | |
} | |
implicit val intCodec: BinaryCodec[Int] = new BinaryCodec[Int] { | |
override def read(is: DataInputStream): Int = readVarint(is, 32).toInt | |
override def write(a: Int, os: DataOutputStream): Unit = writeVarint(a, os) | |
} | |
implicit val longCodec: BinaryCodec[Long] = new BinaryCodec[Long] { | |
override def read(is: DataInputStream): Long = readVarint(is, 64) | |
override def write(a: Long, os: DataOutputStream): Unit = writeVarint(a, os) | |
} | |
implicit val stringCodec: BinaryCodec[String] = new BinaryCodec[String] { | |
override def read(is: DataInputStream): String = { | |
val len = readVarint(is, 16).toInt | |
val buf = new Array[Char](len) | |
var i = 0 | |
while (i < len) { | |
buf(i) = readVarint(is, 16).toChar | |
i += 1 | |
} | |
new String(buf) | |
} | |
override def write(a: String, os: DataOutputStream): Unit = { | |
val len = a.length | |
writeVarint(len, os) | |
var i = 0 | |
while (i < len) { | |
writeVarint(a.charAt(i), os) | |
i += 1 | |
} | |
} | |
} | |
implicit def seqCodec[A: BinaryCodec]: BinaryCodec[Seq[A]] = new BinaryCodec[Seq[A]] { | |
override def read(is: DataInputStream): Seq[A] = { | |
val len = readVarint(is, 24) | |
val b = Vector.newBuilder[A] | |
(1L to len).foreach { _ => | |
b += BinaryCodec[A].read(is) | |
} | |
b.result() | |
} | |
override def write(a: Seq[A], os: DataOutputStream): Unit = { | |
writeVarint(a.length, os) | |
a.foreach(e => BinaryCodec[A].write(e, os)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment