For anyone who is interested, here's a case study on serialization of scala closures that may or may not have been the result of a lengthy and passionate discussion about what you can and can't serialize and why it's necessary to generate a closure with a generating function if you want to use it in Spark
import java.io.{ByteArrayOutputStream, ObjectOutputStream, NotSerializableException}
object Temp {
def main(args: Array[String]): Unit = {
def serialize(value: Any): Array[Byte] = {
val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(stream)
oos.writeObject(value)
oos.close()
stream.toByteArray
}
class A extends Serializable {
var x = 1
}
class B {
var a = new A
}
val a = new A
val b = new B
serialize(a)
println("serialized A")
try {
serialize(b)
} catch {
case e: NotSerializableException => println("could not serialize B")
}
val f = () => b.a
def generator(x: A): () => A = {
() => x
}
val g = generator(b.a)
try {
serialize(f)
} catch {
case e: NotSerializableException => println("could not serialize f")
}
serialize(g)
println("serialized g")
}
}
The intent here is to check that the closure f
cannot be serialized but the closure g
can be serialized (which means a distributed Spark transformation such as rdd.map(f)
will throw an exception but rdd.map(g)
will succeed)
The return result of closure f
defined explicitly depends on b.a
where b
is an instance of B
which is not serializable, so the closure f
is not serializable as well.
The return result of the closure returned by generator(x: A)
depends on x
which is declared in the generator
arguments and is an instance of A
which is serializable, so the closure g
returned by generator(b.a)
is serializable as well. Here, b.a
is actually executed in the main
execution scope and only the attribute a
is passed to generator(x: A)
.
In casual terms, this strips the attribute a
from the instance b
of type B
by declaration of x
such that the closure g
only only needs to serialize x
of type A
.