Last active
March 6, 2019 15:07
-
-
Save michaelmalak/e5e2d382dbde8a536175b4128419c6d1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Code for Michael Malak's Spark Summit 2016 presentation | |
// "Finding Graph Isomorphisms in GraphX and GraphFrames | |
// Additional code can be downloaded from | |
// https://www.manning.com/books/spark-graphx-in-action | |
// * readRdf() for GraphX | |
// * EXAMPLE: Missing <exports> for Canada (using GraphX SVD++) | |
// * EXAMPLE: Missing <influences> for well-known philosophers (using GraphFrames) | |
// Shell command to launch Spark Shell with GraphFrames jar | |
// ~/Downloads/spark-1.6.0-bin-hadoop2.3/bin/spark-shell --driver-memory 13g --driver-cores 4 --jars ~/graphframes/target/scala-2.10/graphframes_2.10-0.0.1-SNAPSHOT.jar | |
// readRdfDf (for GraphFrames) | |
//============================ | |
import org.graphframes._ | |
import org.apache.spark.sql.Row | |
import org.apache.spark.sql.Column | |
import org.apache.spark.sql.types._ | |
def readRdfDf(sc:org.apache.spark.SparkContext, filename:String) = { | |
val r = sc.textFile(filename).map(_.split("\t")) | |
val v = r.map(_(1)).union(r.map(_(3))).distinct.zipWithIndex.map( | |
x => Row(x._2,x._1)).cache | |
val stv = StructType(StructField("id",LongType) :: | |
StructField("attr",StringType) :: Nil) | |
val vdf = sqlContext.createDataFrame(v,stv).cache | |
val str = StructType(StructField("rdfId",StringType) :: | |
StructField("subject",StringType) :: | |
StructField("predicate",StringType) :: | |
StructField("object",StringType) :: Nil) | |
val edf = sqlContext.createDataFrame(r.map(Row.fromSeq(_)),str) | |
.join(vdf, $"subject" === $"attr") | |
.selectExpr("id AS src", "predicate", "object") | |
.join(vdf, $"object" === $"attr") | |
.selectExpr("src", "id AS dst", "predicate AS attr") | |
v.unpersist(false) | |
GraphFrame(vdf,edf) | |
} | |
// mergeGraphs (for GraphFrames) | |
//============================== | |
def mergeGraphs(g1:GraphFrame, g2:GraphFrame) = { | |
val vunion = g1.vertices.sqlContext.createDataFrame( | |
g1.vertices.select($"attr").unionAll(g2.vertices.select($"attr")).distinct.rdd.map(_(0)).zipWithIndex.map(t => Row.fromTuple(t.swap)), | |
new StructType(Array(StructField("id",LongType),StructField("attr",StringType)))).cache | |
def edgesWithNewVertexIds(gf:GraphFrame) = { | |
gf.triplets | |
.join(vunion, $"src.attr" === $"attr") | |
.selectExpr("id AS src","edge.attr AS edgeattr","dst.attr AS dstattr") | |
.join(vunion, $"dstattr" === $"attr") | |
.selectExpr("src","id AS dst","edgeattr AS attr") | |
} | |
GraphFrame(vunion, edgesWithNewVertexIds(g1).unionAll(edgesWithNewVertexIds(g2))) | |
} | |
// Read YAGO | |
//========== | |
var yago = readRdfDf(sc, "s3n://yago3/yagoFacts.tsv") | |
def mergeIntoYago(filename:String):Unit = { | |
val r = readRdfDf(sc, filename) | |
r.vertices.cache | |
r.edges.cache | |
val g = mergeGraphs(yago, r) | |
val g2 = GraphFrame(g.vertices, g.edges.distinct) | |
g2.vertices.repartition(1000) | |
g2.vertices.sqlContext.setConf("spark.sql.shuffle.partitions", "1000") | |
g2.vertices.cache | |
g2.vertices.count | |
g2.edges.repartition(1000) | |
g2.edges.sqlContext.setConf("spark.sql.shuffle.partitions", "1000") | |
g2.edges.cache | |
g2.edges.count | |
yago.vertices.unpersist | |
yago.edges.unpersist | |
r.vertices.unpersist | |
r.edges.unpersist | |
yago = g2 | |
} | |
mergeIntoYago("s3n://yago3/yagoLabels.tsv") | |
mergeIntoYago("s3n://yago3/yagoTaxonomy.tsv") | |
// removeSingletons() | |
// used by filterEdges() | |
//====================== | |
def removeSingletons(g:GraphFrame) = | |
GraphFrame(g.vertices.sqlContext.createDataFrame( | |
g.triplets.select("src").map(_.getStruct(0)) | |
.union(g.triplets.select("dst").map(_.getStruct(0))) | |
.distinct, | |
g.vertices.schema), | |
g.edges) | |
// filterEdges() | |
// for prefiltering prior to running GraphFrames query | |
//==================================================== | |
def filterEdges(g:GraphFrame, condition:Column) = | |
removeSingletons(GraphFrame(g.vertices, g.edges.where(condition))) | |
// EXAMPLE: General 3-vertex rule mining | |
//====================================== | |
yago.find("()-[e1]->(v2); (v2)-[e2]->(v3)") | |
.distinct | |
.selectExpr("e1.attr AS e1attr", "e2.attr AS e2attr", "v3.id AS v3id", "v3.attr AS v3attr") | |
.groupBy("e1attr", "e2attr", "v3id", "v3attr") | |
.count | |
.join(yago.edges.groupBy("attr") | |
.count | |
.selectExpr("attr", "count AS e1count"), | |
$"e1attr" === $"attr") | |
.selectExpr("e1attr", "e2attr", "v3attr", "count/e1count AS ratio") | |
.orderBy($"ratio".desc) | |
.show(40) | |
// EXAMPLE: General 4-vertex rule mining | |
// (incomplete, ran out of memory) | |
//====================================== | |
val y1 = yago.find("(v1)-[e3]->(v4); (v1)-[e1]->(v2); (v2)-[e2]->(v3)") | |
.distinct | |
.selectExpr("e3.attr AS e3attr", "e1.attr AS e1attr", "e2.attr AS e2attr", "v3.id AS v3id", "v3.attr AS v3attr") | |
.groupBy("e3attr", "e1attr", "e2attr", "v3id", "v3attr") | |
.count | |
.cache | |
// EXAMPLE: Find missing <isMarriedTo> | |
// (first attempt, results in a lot of false positives due to name aliases) | |
val df = y2.find("(parent1)-[e1]->(child); (parent2)-[e2]->(child); !(parent1)-[]->(parent2)") | |
.filter(($"parent1" !== $"parent2") && ($"e1.attr" === "<hasChild>") && ($"e2.attr" === "<hasChild>")) | |
.selectExpr("parent1.attr AS parent1", "parent2.attr AS parent2", "child.attr AS child") | |
.cache | |
df.show | |
// EXAMPLE: Find missing <isMarriedTo> | |
// (better, finds the Osbournes) | |
val df = y2.find("(parent1)-[e1]->(child); (parent2)-[e2]->(child); (parent1)-[]->(p1gender); (parent2)-[]->(p2gender)") | |
.filter(($"p1gender.attr" === "<male>") && ($"p2gender.attr" === "<female>") && ($"e1.attr" === "<hasChild>") && ($"e2.attr" === "<hasChild>")) | |
.selectExpr("parent1.attr AS parent1", "parent2.attr AS parent2", "child.attr AS child") | |
.cache | |
df.show |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment