Skip to content

Instantly share code, notes, and snippets.

@rjhall
Created November 13, 2013 20:12
Show Gist options
  • Save rjhall/7455570 to your computer and use it in GitHub Desktop.
Save rjhall/7455570 to your computer and use it in GitHub Desktop.
Stuff to trade CPU for better memory use vs cascading.kryo. Basically uses some trick streams instead of a buffer in ram like I did here: https://github.com/Cascading/cascading.kryo/blob/develop/src/jvm/cascading/kryo/KryoSerializer.java#L31 If you give kryo the unadulterated streams it will read beyond the object boundaries, and hadoop will die.
class ModelSerialization extends com.twitter.scalding.serialization.KryoHadoop {
override def getSerializer(c : Class[Object]) : Serializer[Object] = {
new ModelSerializer(populatedKryo)
}
override def getDeserializer(c : Class[Object]) : Deserializer[Object] = {
new ModelDeserializer(populatedKryo, c)
}
}
class ModelSerializer(kryo : Kryo) extends Serializer[Object] {
var o : java.io.OutputStream = _
def open(os : java.io.OutputStream) : Unit = {
o = os
}
def close : Unit = {
o.flush()
o.close()
}
def serialize(s : Object) : Unit = {
val fo = new FakeOutputStream()
val fko = new Output(fo)
kryo.writeObject(fko, s)
fko.flush()
val sz = fo.sz
val d = new java.io.DataOutputStream(o)
d.writeLong(sz)
val ko = new Output(d)
kryo.writeObject(ko, s)
ko.flush()
d.flush()
}
}
class ModelDeserializer(kryo : Kryo, k : Class[Object]) extends Deserializer[Object] {
var i : java.io.InputStream = _
def open(is : java.io.InputStream) : Unit = {
i = is
}
def close : Unit = {
i.close()
}
def deserialize(s : Object) : Object = {
val d = new java.io.DataInputStream(i)
val sz : java.lang.Long = d.readLong()
val ki = new Input(new FakeInputStream(sz, i))
kryo.readObject(ki, k)
}
}
class FakeOutputStream extends java.io.OutputStream {
var sz : Long = 0
override def write(i : Int) : Unit = {
sz += 1
}
}
class FakeInputStream(sz : Long, in : java.io.InputStream) extends java.io.InputStream {
var curr = 0;
override def read : Int = {
if(curr >= sz) {
throw new java.io.IOException("tried to read beyond " + sz)
}
curr += 1
in.read()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment