Last active
December 17, 2015 07:38
-
-
Save davideanastasia/b0bef569b4b7dec66c3f to your computer and use it in GitHub Desktop.
Spark Cassandra Connector Enum fiasco
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.apache.spark.{SparkConf, SparkContext} | |
| import com.datastax.spark.connector._ | |
| import com.datastax.spark.connector.types._ | |
| import scala.reflect.runtime.universe._ | |
| object Days { | |
| sealed abstract class WeekDay(val id: Int) | |
| object WeekDay { | |
| val intMapping = Map(0 -> MON, 1 -> TUE, 2 -> WED, 3 -> THU, 4 -> FRI, 5 -> SAT, 6 -> SUN) | |
| val stringMapping = Map("MON" -> MON, "TUE" -> TUE, "WED" -> WED, "THU" -> THU, "FRI" -> FRI, "SAT" -> SAT, "SUN" -> SUN) | |
| def apply(id: Int) = intMapping.getOrElse(id, MON) | |
| def apply(id: String) = stringMapping.getOrElse(id, MON) | |
| def unapply(id: Int): Option[Int] = Option(id) | |
| } | |
| case object MON extends WeekDay(0) | |
| case object TUE extends WeekDay(1) | |
| case object WED extends WeekDay(2) | |
| case object THU extends WeekDay(3) | |
| case object FRI extends WeekDay(4) | |
| case object SAT extends WeekDay(5) | |
| case object SUN extends WeekDay(6) | |
| } | |
| import Days._ | |
| object Example { | |
| object IntToWeekDayConverter extends TypeConverter[WeekDay] { | |
| def targetTypeTag = typeTag[WeekDay] | |
| def convertPF = { case i: Int => WeekDay(i) } | |
| } | |
| object WeekDayToIntConverter extends TypeConverter[Integer] { | |
| def targetTypeTag = typeTag[Integer] | |
| def convertPF = { case wd: WeekDay => wd.id } | |
| } | |
| case class MyCassandraRow(id: String, weight: Int, day: WeekDay) | |
| def main (args: Array[String]) { | |
| TypeConverter.registerConverter(IntToWeekDayConverter) | |
| TypeConverter.registerConverter(WeekDayToIntConverter) | |
| val conf = new SparkConf() | |
| .setAppName("cassandra-connector-example") | |
| .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
| .set("spark.cassandra.connection.host", "127.0.0.1") | |
| .setMaster("local[*]") | |
| val sc = new SparkContext(conf) | |
| val data = sc.parallelize( | |
| Seq( | |
| MyCassandraRow("identifier1", 10, MON), | |
| MyCassandraRow("identifier2", 20, FRI), | |
| MyCassandraRow("identifier3", 1, SUN) | |
| ) | |
| ) | |
| data.saveToCassandra("db", "custom_data") | |
| val readData = sc.cassandraTable[MyCassandraRow]("db", "custom_data") | |
| .collect() | |
| .foreach(println) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| DROP KEYSPACE IF EXISTS db; | |
| CREATE KEYSPACE db WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; | |
| use db; | |
| create table custom_data ( | |
| id text, | |
| weight int, | |
| day int, | |
| primary key(id) | |
| ); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment