Skip to content

Instantly share code, notes, and snippets.

@khajavi
Last active June 18, 2016 10:06
Show Gist options
  • Select an option

  • Save khajavi/cc3c684019721bf12f5bf81877939b3f to your computer and use it in GitHub Desktop.

Select an option

Save khajavi/cc3c684019721bf12f5bf81877939b3f to your computer and use it in GitHub Desktop.
Apache Spark Kryo Encoder
class Bar(i: Int) {
override def toString = s"bar $i"
def bar = i
}
object BarEncoders {
implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = org.apache.spark.sql.Encoders.kryo[Bar]
}
abstract class Foo
object FooEncoders {
implicit def fooEncoder: org.apache.spark.sql.Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo]
}
import BarEncoders._
import FooEncoders._
val ds = Seq(new Bar(1)).toDS
val ds2 = Seq(new Foo {}).toDS()
ds.show
ds2.show
abstract class Base extends Serializable
case class A(name: String) extends Base
case class B(age: Int) extends Base
object BaseEncoder {
implicit def baseEncoder: Encoder[Base] = Encoders.kryo[Base]
}
val listA: List[A] = A("foo") :: A("bar") :: Nil
val listB: List[B] = B(10) :: B(20) :: Nil
val list: List[Base with Product with Serializable] = listA ++ listB
val result: RDD[Base with Product with Serializable] = sc.parallelize(list)
val r = result.collect()
val r2 = result.filter {
case A(x) => true
case B(x) => false
}.collect()
r2.foreach {
case A(x) => println(x)
case B(x) => println(x)
}
println(result.collect())
val listC: List[Base with Product with Serializable] = A("foo") :: A("bar") :: Nil
sc.parallelize(listC).toDS()
def printList[T <: Base](l: List[T]) = {
l.foreach {
case A(x) => println(x)
case B(x) => println(x)
}
}
val dsA: Dataset[A] = sc.parallelize(listA).toDS()
val dsB: Dataset[B] = sc.parallelize(listB).toDS()
printList(list)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment