Created
July 14, 2015 16:47
-
-
Save dosht/168629b30e1880f7d59b to your computer and use it in GitHub Desktop.
Some Spark/Cassandra Examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package exampels | |
import com.datastax.spark.connector.cql.CassandraConnector | |
import org.apache.spark.{ SparkConf, SparkContext } | |
import org.apache.spark.sql.cassandra.CassandraSQLContext | |
import org.apache.spark.SparkContext._ | |
import com.datastax.spark.connector._ | |
object CassandraSparkExamples { | |
// Connection | |
val conf = new SparkConf(true) | |
.setMaster("spark://10.20.30.10:7077") | |
.set("spark.cassandra.connection.host", "10.20.30.10") | |
.setJars(Array("scala-2.10/spark-examples_2.10-0.1-SNAPSHOT.jar")) | |
.setAppName("test") | |
val sc = new SparkContext(conf) | |
// Cassandra Table and mapping | |
val xs = sc.cassandraTable("test", "kv") | |
println(xs.collect()) | |
case class KV(key: String, value: Option[Int]) | |
sc.cassandraTable("test", "kv") | |
.select("key", "value") | |
.where("key = ?", "key2") | |
.as(KV) | |
.collect() | |
// Album Table | |
case class Album(title: String, year: Int, genre: String, performer: String, traks: Map[Int, String]) | |
val albums = sc.cassandraTable("musicdb", "album").as(Album) | |
println(albums.take(1)) | |
sc.parallelize(List(Album("x1", 1900, "y1", "p1", Map(1 -> "track1")))) | |
.flatMap(Album.unapply) | |
.saveToCassandra("musicdb", "album") | |
// Performer Table | |
case class Performer(name: String, born: Option[Int], country: Option[String], died: Option[Int], founded: Option[Int], style: Option[String], `type`: Option[String]) | |
val performers = sc.cassandraTable("musicdb", "performer").as(Performer) | |
println(performers.take(1)) | |
// JOIN | |
val performersKeyValPair = performers.map(p => p.name -> p) | |
val albumsKeyValPair = albums.map(a => a.performer -> a) | |
val albumPerformerPairs = albumsKeyValPair join performersKeyValPair | |
albumPerformerPairs.take(1) | |
// SQL | |
val csc = new CassandraSQLContext(sc) | |
csc.cassandraSql("SELECT * FROM musicdb.album") | |
csc.setKeyspace("musicdb") | |
// Syyntactic Constraints | |
// ... Uniqness | |
csc.sql("SELECT name FROM performers_by_style GROUP BY name HAVING COUNT(style) > 1").collect | |
// ------ or | |
case class PerformersByStyle(style: String, name: String) | |
sc.cassandraTable("musicdb", "performers_by_style") | |
.as(PerformersByStyle) | |
.map(_.name -> 1) | |
.reduceByKey(_ + _) | |
.filter(_._2 > 1) | |
.collect | |
// ... Referential Integrity | |
val performersByStyle = sc.cassandraTable("musicdb", "performers_by_style").as(PerformersByStyle) | |
csc.sql("SELECT name FROM performers_by_style LEFT OUTER JOIN performer ON (name = name) WHERE name IS NULL").collect | |
val performerPairs = performers.map(p => p.name -> 1) | |
val performersByStylePairs = performersByStyle.map(p => p.name -> 1) | |
(performerPairs leftOuterJoin performersByStylePairs).filter { case (key, (left, right)) => right.isEmpty } | |
// ... Duplication Integrity | |
case class AlbumsByTrack(track_title: String, performer: String, year: Int, album_title: String) | |
val albumsByTrack = sc.cassandraTable("musicdb", "albums_by_track").as(AlbumsByTrack) | |
val albumsPairs = albums.map(a => (a.title, a.year) -> 1) | |
val albumsByTrackPairs = albumsByTrack.map(a => (a.track_title, a.year) -> 1) | |
(albumsPairs union albumsByTrackPairs).reduceByKey(_ + _).filter(_._2 > 1).collect() | |
// Semantic Constrains | |
csc.sql("SELECT name AS zombie FROM performer WHERE born > died").collect() | |
Thread.sleep(10000) | |
sc.stop() | |
println("DONE") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment