Last active
August 29, 2015 14:07
-
-
Save ldacosta/ea9d0e2ead06aafe7ca1 to your computer and use it in GitHub Desktop.
I am trying to narrow down an Exception thrown by Spark when using "factories" to create classes. See example below ====> only factory2 works!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.SparkContext | |
import org.apache.spark.rdd.RDD | |
/** | |
* I am trying to narrow down on an Exception thrown by Spark when using "Factories". | |
* The factories have parameters that are used in the classes' functions. | |
* | |
* To run this code: copy-paste this whole content in a Spark-Shell. Execute Test.theMain(sc) | |
* | |
*/ | |
object Test { | |
abstract class A { | |
def aFunction: RDD[Int] => RDD[Int] | |
} | |
/** | |
* Creates a class that uses the parameter of the factory directly | |
*/ | |
def factory1(i: Int) = new A { | |
def aFunction = { | |
rdd: RDD[Int] => rdd.map(_ + i) | |
} | |
} | |
/** | |
* The class returned that creates an instance's local reference (val) of the parameter | |
*/ | |
def factory2(i: Int) = new A { | |
val j = i | |
def aFunction = { | |
rdd: RDD[Int] => rdd.map(_ + j) | |
} | |
} | |
/** | |
* Creates a class whose function has a local reference (val) of the parameter | |
*/ | |
def factory3(i: Int) = new A { | |
def aFunction = { | |
rdd: RDD[Int] => { val j = i; rdd.map(_ + j) } | |
} | |
} | |
/** | |
* Local reference (val) of the parameter in closure | |
*/ | |
def factory4(i: Int) = new A { | |
def aFunction = { | |
rdd: RDD[Int] => rdd.map { anInt => val j = i; anInt + j } | |
} | |
} | |
def theMain(sc: SparkContext) = { | |
val anRDD = sc.parallelize(1 to 9) | |
val runningOptions = List(("Factory1", factory1 _), ("Factory2", factory2 _), ("Factory3", factory3 _), ("Factory4", factory4 _)) | |
runningOptions foreach { case (theName, theFactory) => | |
println(s"RUNNING TEST WITH [${theName}]") | |
try { | |
val rddPlus2 = theFactory(2) | |
val anRDDPlus2 = rddPlus2.aFunction(anRDD) | |
(anRDD zip anRDDPlus2).foreach{ case (i, iPlus2) => println(s"[${theName}] i = ${i}, iPlus2 = ${iPlus2}}") } | |
} | |
catch { | |
case e: Exception => println(s"${theName} ERROR ===> ${e.getMessage}") | |
} | |
} | |
} | |
} |
RUNNING TEST WITH [Factory1]
Factory1 ERROR ===> Task not serializable
RUNNING TEST WITH [Factory2]
Factory2 ERROR ===> Task not serializable
RUNNING TEST WITH [Factory3]
RUNNING TEST WITH [Factory4]
Factory4 ERROR ===> Task not serializable
RUNNING TEST WITH [Factory5]
Factory5 ERROR ===> Task not serializable
RUNNING TEST WITH [Factory6]
defined class A
factory1: (i: Int)A
factory2: (i: Int)A{val j: Int}
factory3: (i: Int)A
factory4: (i: Int)A
factory5: (i: Int)A
factory6: (i: Int)A
anRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[103] at parallelize at <console>:73
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
as per discussion today, another thing to try is to access variables in
aFunction
which is a field in an object. eg.aFunction
is an rdd transformer which increments values by bothi
andMyObject.j
. and then withMyObject.j
you should try different options of imports (wildcard, specific, with/without renamings) and place them in different places and see which factories work