Created
September 3, 2014 04:47
-
-
Save koeninger/34ff4855f3daa4a4ff0d to your computer and use it in GitHub Desktop.
Spark SQL, Parquet, GraphX examples
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.{ SQLContext, Row } | |
val sqlContext = new SQLContext(sc) | |
import sqlContext._ | |
/* | |
Next you'll need to get your data registered as a table. Note that in order to infer the schema (column names and types) it must be a case class. A normal class won't work, unless you manually implement much of what a case class would do, e.g. extend Product and Serializable. | |
Any data source can be used, as long as you can transform it to an rdd of case classes. For now, we'll just use some dummy data: | |
*/ | |
case class Person(name: String, age: Int, phones: Seq[String]) | |
case class Likes(name: String, likes: String) | |
val people = sc.parallelize(Array( | |
Person("adam", 19, Seq("123-5767", "234-1544")), | |
Person("bob", 23, Seq()), | |
Person("cathy", 14, Seq("454-1222", "433-1212", "345-2343")), | |
Person("dave", 44, Seq("454-1222")) | |
)) | |
people.registerAsTable("people") | |
val likes = sc.parallelize(Array( | |
Likes("adam", "cathy"), | |
Likes("bob", "dave"), | |
Likes("bob", "adam"), | |
Likes("cathy", "adam"), | |
Likes("cathy", "bob") | |
)) | |
likes.registerAsTable("likes") | |
/* | |
Now that you have tables registered, you can write sql queries against those tables, to produce an RDD of Row objects that can then be collected, or further transformed: | |
*/ | |
sql("select * from people").collect.foreach(println) | |
/* | |
Note that Person has a column, phones, that is a collection type. Seqs are fully supported, but for arrays only Array[Byte] are currently supported. Spark sql supports indexing into collections using the name[i] syntax, including nested collections via e.g. foo[0][0] | |
*/ | |
sql(""" | |
select name, phones[0] from people where name in ("adam", "cathy") | |
""").collect | |
/* | |
it supports joins | |
*/ | |
sql(""" | |
select p.name, l.name as liker | |
from people p | |
inner join likes l on (p.name = l.likes) | |
""").collect | |
/* | |
Note that spark sql is agnostic regarding formatting and capitalization, if you prefer SELECT foo FROM..., that's fine too. | |
It supports basic aggregations | |
*/ | |
sql(""" | |
select min(age), max(age), avg(age), sum(age), count(age) from people | |
""").collect | |
/* | |
supports grouping and having | |
*/ | |
sql(""" | |
select name, count(likes) as k from likes | |
group by name | |
having k > 1 | |
""").collect | |
/* | |
and supports sorting and limit | |
*/ | |
sql(""" | |
select name, age from people order by age desc limit 2 | |
""").collect | |
/* | |
Note that unlike most relational databases, you must include the ordering column in the projection. The following will not work: | |
*/ | |
// BAD | |
sql(""" | |
select name from people order by age desc limit 2 | |
""").collect | |
/* | |
It supports distinct | |
*/ | |
sql(""" | |
select distinct name from likes | |
""").collect | |
/* | |
and approximation, like a normal scala RDD's countApproxDistinct method. Doesn't seem to support countApprox with a timeout, though. | |
*/ | |
sql(""" | |
select approximate count(distinct name) from likes | |
""").collect | |
/* | |
parquet save | |
*/ | |
people.saveAsParquetFile("file:///var/tmp/demo/people") | |
likes.saveAsParquetFile("file:///var/tmp/demo/likes") | |
/* | |
quit and load, schema is in the parquet file | |
*/ | |
import org.apache.spark.sql.{ SQLContext, Row } | |
val sqlContext = new SQLContext(sc) | |
import sqlContext._ | |
sqlContext.parquetFile("file:///var/tmp/demo/people").registerAsTable("people") | |
sqlContext.parquetFile("file:///var/tmp/demo/likes").registerAsTable("likes") | |
sql("select * from people").collect.foreach(println) | |
/* | |
graphx basics | |
*/ | |
import org.apache.spark.graphx._ | |
import org.apache.spark.rdd.RDD | |
/* | |
need a unique Long identifier in order to make vertices. | |
for demo purposes, the first character is sufficiently unique | |
*/ | |
def id(s: String) = s.head.toLong | |
val vertices: RDD[(VertexId, String)] = sql("select name from people").map { | |
case Row(name: String) => | |
id(name) -> name | |
} | |
/* | |
helper to make vertex ids more readable | |
*/ | |
val nameOf = vertices.collect.toMap | |
/* | |
GraphX edges are directional, so if you want the equivalent of an undirected graph, you need to make edges in both directions | |
Edges are assumed to have a label of some kind (e.g. a weight or a name), if you don't need it, just use 0 | |
*/ | |
val edges: RDD[Edge[Int]] = sql("select name, likes from likes").map { | |
case Row(name: String, likes: String) => | |
Edge(id(name), id(likes), 0) | |
} | |
/* | |
Passing a default avoids errors when an edge refers to an unknown vertex | |
*/ | |
val default = "UNKNOWN" | |
val graph = Graph(vertices, edges, default) | |
/* | |
Now that you have a graph object, you can perform operations on it: | |
*/ | |
val degrees = graph.degrees.collect.sortWith((a, b) => (a._2 > b._2)) | |
println(s""" | |
the vertex with the highest degree is "${nameOf(degrees(0)._1)}", | |
having ${degrees(0)._2} relationships | |
""") | |
val scc = graph.stronglyConnectedComponents(numIter = 5) | |
scc.vertices.map { | |
case (member, leader) => s"${nameOf(member)} is in ${nameOf(leader)}'s clique" | |
}.collect.foreach(println) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment