Skip to content

Instantly share code, notes, and snippets.

@dosht
Created July 14, 2015 16:47
Show Gist options
  • Save dosht/168629b30e1880f7d59b to your computer and use it in GitHub Desktop.
Save dosht/168629b30e1880f7d59b to your computer and use it in GitHub Desktop.
Some Spark/Cassandra Examples
package exampels
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.SparkContext._
import com.datastax.spark.connector._
object CassandraSparkExamples {
// Connection
val conf = new SparkConf(true)
.setMaster("spark://10.20.30.10:7077")
.set("spark.cassandra.connection.host", "10.20.30.10")
.setJars(Array("scala-2.10/spark-examples_2.10-0.1-SNAPSHOT.jar"))
.setAppName("test")
val sc = new SparkContext(conf)
// Cassandra Table and mapping
val xs = sc.cassandraTable("test", "kv")
println(xs.collect())
case class KV(key: String, value: Option[Int])
sc.cassandraTable("test", "kv")
.select("key", "value")
.where("key = ?", "key2")
.as(KV)
.collect()
// Album Table
case class Album(title: String, year: Int, genre: String, performer: String, traks: Map[Int, String])
val albums = sc.cassandraTable("musicdb", "album").as(Album)
println(albums.take(1))
sc.parallelize(List(Album("x1", 1900, "y1", "p1", Map(1 -> "track1"))))
.flatMap(Album.unapply)
.saveToCassandra("musicdb", "album")
// Performer Table
case class Performer(name: String, born: Option[Int], country: Option[String], died: Option[Int], founded: Option[Int], style: Option[String], `type`: Option[String])
val performers = sc.cassandraTable("musicdb", "performer").as(Performer)
println(performers.take(1))
// JOIN
val performersKeyValPair = performers.map(p => p.name -> p)
val albumsKeyValPair = albums.map(a => a.performer -> a)
val albumPerformerPairs = albumsKeyValPair join performersKeyValPair
albumPerformerPairs.take(1)
// SQL
val csc = new CassandraSQLContext(sc)
csc.cassandraSql("SELECT * FROM musicdb.album")
csc.setKeyspace("musicdb")
// Syyntactic Constraints
// ... Uniqness
csc.sql("SELECT name FROM performers_by_style GROUP BY name HAVING COUNT(style) > 1").collect
// ------ or
case class PerformersByStyle(style: String, name: String)
sc.cassandraTable("musicdb", "performers_by_style")
.as(PerformersByStyle)
.map(_.name -> 1)
.reduceByKey(_ + _)
.filter(_._2 > 1)
.collect
// ... Referential Integrity
val performersByStyle = sc.cassandraTable("musicdb", "performers_by_style").as(PerformersByStyle)
csc.sql("SELECT name FROM performers_by_style LEFT OUTER JOIN performer ON (name = name) WHERE name IS NULL").collect
val performerPairs = performers.map(p => p.name -> 1)
val performersByStylePairs = performersByStyle.map(p => p.name -> 1)
(performerPairs leftOuterJoin performersByStylePairs).filter { case (key, (left, right)) => right.isEmpty }
// ... Duplication Integrity
case class AlbumsByTrack(track_title: String, performer: String, year: Int, album_title: String)
val albumsByTrack = sc.cassandraTable("musicdb", "albums_by_track").as(AlbumsByTrack)
val albumsPairs = albums.map(a => (a.title, a.year) -> 1)
val albumsByTrackPairs = albumsByTrack.map(a => (a.track_title, a.year) -> 1)
(albumsPairs union albumsByTrackPairs).reduceByKey(_ + _).filter(_._2 > 1).collect()
// Semantic Constrains
csc.sql("SELECT name AS zombie FROM performer WHERE born > died").collect()
Thread.sleep(10000)
sc.stop()
println("DONE")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment