Last active
June 18, 2016 10:06
-
-
Save khajavi/cc3c684019721bf12f5bf81877939b3f to your computer and use it in GitHub Desktop.
Apache Spark Kryo Encoder
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Bar(i: Int) { | |
| override def toString = s"bar $i" | |
| def bar = i | |
| } | |
| object BarEncoders { | |
| implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = org.apache.spark.sql.Encoders.kryo[Bar] | |
| } | |
| abstract class Foo | |
| object FooEncoders { | |
| implicit def fooEncoder: org.apache.spark.sql.Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo] | |
| } | |
| import BarEncoders._ | |
| import FooEncoders._ | |
| val ds = Seq(new Bar(1)).toDS | |
| val ds2 = Seq(new Foo {}).toDS() | |
| ds.show | |
| ds2.show | |
| abstract class Base extends Serializable | |
| case class A(name: String) extends Base | |
| case class B(age: Int) extends Base | |
| object BaseEncoder { | |
| implicit def baseEncoder: Encoder[Base] = Encoders.kryo[Base] | |
| } | |
| val listA: List[A] = A("foo") :: A("bar") :: Nil | |
| val listB: List[B] = B(10) :: B(20) :: Nil | |
| val list: List[Base with Product with Serializable] = listA ++ listB | |
| val result: RDD[Base with Product with Serializable] = sc.parallelize(list) | |
| val r = result.collect() | |
| val r2 = result.filter { | |
| case A(x) => true | |
| case B(x) => false | |
| }.collect() | |
| r2.foreach { | |
| case A(x) => println(x) | |
| case B(x) => println(x) | |
| } | |
| println(result.collect()) | |
| val listC: List[Base with Product with Serializable] = A("foo") :: A("bar") :: Nil | |
| sc.parallelize(listC).toDS() | |
| def printList[T <: Base](l: List[T]) = { | |
| l.foreach { | |
| case A(x) => println(x) | |
| case B(x) => println(x) | |
| } | |
| } | |
| val dsA: Dataset[A] = sc.parallelize(listA).toDS() | |
| val dsB: Dataset[B] = sc.parallelize(listB).toDS() | |
| printList(list) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment