Skip to content

Instantly share code, notes, and snippets.

@ahoy-jon
Created June 26, 2016 21:29
Show Gist options
  • Save ahoy-jon/15aadac1fb1604f990e5f01e8dae30c3 to your computer and use it in GitHub Desktop.
Save ahoy-jon/15aadac1fb1604f990e5f01e8dae30c3 to your computer and use it in GitHub Desktop.
Spark Datasets Api + Shapeless Tags
package com.cym_iot.training.testspark16
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Dataset, Encoder, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import shapeless.tag
import shapeless.tag.@@
trait StrName
case class Address(strName: String @@ StrName)
case class Person(name: String, addr: Seq[Address], opt: Option[String] = None)
object sparkshapeless {
implicit def taggEncoder[T, TAG](implicit encoder: Encoder[T]): Encoder[T @@ TAG] = encoder.asInstanceOf[ExpressionEncoder[T @@ TAG]]
}
object Testspark16 {
def main(args: Array[String]) {
/**
* TESTING FOR DATASETS FEATURE
* ☑ Seq support
* ☑ Option support
* ☑ Shapeless Tag support (ok, with an implicit)
*/
val sconf = new SparkConf().setMaster("local").setAppName("test-Spark-1.6")
val sc: SparkContext = new SparkContext(sconf)
val sqlContext: SQLContext = new SQLContext(sc)
val parallelize: RDD[Person] = sc.parallelize(Seq(Person("abc", Seq(Address(tag[StrName]("street"))))))
import sparkshapeless._
import sqlContext.implicits._
val dataset: Dataset[Person] = sqlContext.createDataFrame(parallelize).as[Person]
val map: Dataset[@@[String, StrName]] = dataset.flatMap(_.addr).map(_.strName)
assert(map.collect().toList == List("street"))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment