Created
September 5, 2015 21:16
-
-
Save ankurdave/2f06008d9f9526c207eb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.SparkConf | |
import org.apache.spark.SparkContext | |
import org.apache.spark.sql.SQLContext | |
import org.apache.spark.sql.DataFrame | |
val conf = new SparkConf() | |
val sc = new SparkContext("local", "test") | |
val sqlContext = new SQLContext(sc) | |
val v = sqlContext.createDataFrame(List( | |
(0L, "a", 1), | |
(1L, "b", 2), | |
(2L, "c", 3), | |
(3L, "d", 4))).toDF("id", "attr", "number") | |
val e = sqlContext.createDataFrame(List( | |
(0L, 1L), | |
(1L, 2L), | |
(2L, 3L), | |
(2L, 0L))).toDF("src_id", "dst_id") | |
val g = GraphFrame(v, e) | |
val vertices = g.find("(a)") | |
vertices.show | |
val triplets = g.find("(u)-[]->(v)") | |
triplets.show | |
triplets.explain(true) | |
val triangles = g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(a)").select("a_id", "b_id", "c_id") | |
triangles.show | |
triangles.explain(true) | |
def countJoins(df: DataFrame) = { | |
import org.apache.spark.sql.catalyst.plans.logical.Join | |
val unopt = df.queryExecution.analyzed.collect { | |
case j: Join => j | |
}.size | |
val opt = df.queryExecution.optimizedPlan.collect { | |
case j: Join => j | |
}.size | |
println(s"Optimization reduced number of joins from $unopt to $opt") | |
} | |
countJoins(triangles) | |
val fof = g.find("(u)-[]->(v); (v)-[]->(w)").select("u_id", "v_id", "w_id") | |
val fof = g.find("(u)-[]->(v); (v)-[]->(w); !(u)-[]->(w); !(w)-[]->(u)").select("u_id", "v_id", "w_id") | |
fof.show | |
fof.explain(true) | |
countJoins(fof) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment