Created
October 13, 2015 20:49
-
-
Save jexp/0dfad34d49a16000e804 to your computer and use it in GitHub Desktop.
DBPedia in Neo4j -> Read from Neo4j -> Run PageRank (5 iterations) -> Write back to Neo4j
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// running spark on a large single machine | |
// 6 workers, with 12G RAM each -> 72G total and 8G for the driver -> 80G RAM in total | |
// the machine has 6 physical CPUs | |
// the jar contains just AnormCypher.org + Dependencies | |
neo@neo:/tmp/spark$ bin/spark-shell --jars ../neo4j/target/scala-2.10/Neo4j-Spark-Demo-assembly-1.0.jar --driver-memory 8G --executor-memory 12G --master local[6] | |
Welcome to | |
____ __ | |
/ __/__ ___ _____/ /__ | |
_\ \/ _ \/ _ `/ __/ '_/ | |
/___/ .__/\_,_/_/ /_/\_\ version 1.5.0 | |
/_/ | |
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45) | |
Type in expressions to have them evaluated. | |
Type :help for more information. | |
Spark context available as sc. | |
SQL context available as sqlContext. | |
import org.anormcypher._ | |
import org.apache.spark.graphx._ | |
import org.apache.spark.graphx.lib._ | |
var start = System.currentTimeMillis | |
val total = 100000000 | |
val batch = total/1000000 | |
// batch: Int = 100 | |
// read 100 batches of 100k rows of relationships with skip <window> limit 100k | |
// each otherwise Spark runs into OOM keeping 100M rows at once | |
val links = sc.range(0,batch).repartition(batch).mapPartitionsWithIndex( (i,p) => { | |
val dbConn = Neo4jREST("localhost", 9474, "/db/data/", "neo4j", "test") | |
val q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1) as from, id(p2) as to skip {skip} limit 1000000" | |
p.flatMap( skip => { | |
Cypher(q).on("skip"->skip*1000000).apply()(dbConn).map(row => | |
(row[Int]("from").toLong,row[Int]("to").toLong) | |
) | |
}) | |
}) | |
// links: org.apache.spark.rdd.RDD[(Long, Long)] = MapPartitionsRDD[6] at mapPartitionsWithIndex at <console>:34 | |
links.cache | |
// res0: links.type = MapPartitionsRDD[6] at mapPartitionsWithIndex at <console>:34 | |
links.count | |
// res1: Long = 100000000 - 100M entries | |
(System.currentTimeMillis - start)/1000/60 | |
// res2: Long = 9 minutes to read 100M rows in 100 batches | |
// would be faster as a single big read but spark chokes on that | |
start = System.currentTimeMillis | |
// convert tuples into Edge's | |
val edges = links.map( l => Edge(l._1,l._2, None)) | |
// edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[None.type]] = MapPartitionsRDD[7] at map at <console>:36 | |
// create GraphX Graph | |
val g = Graph.fromEdges(edges,"none") | |
// g: org.apache.spark.graphx.Graph[String,None.type] = org.apache.spark.graphx.impl.GraphImpl@58d5bdb0 | |
// alternative pageRank invocation | |
// val v = g.pageRank(.0001).vertices | |
// run 5 iterations of pagerank | |
val v = PageRank.run(g, 5).vertices | |
// v: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[92] at RDD at VertexRDD.scala:57 | |
(System.currentTimeMillis - start)/1000/60 | |
// res3: Long = 122 minutes to run pagerank on GraphX on 100M edges | |
start = System.currentTimeMillis | |
// partition the results into 1000 partitions to write data back concurrently to Neo4j | |
val res = v.repartition(total/100000).mapPartitions( part => { | |
val localConn = Neo4jREST("localhost", 9474, "/db/data/", "neo4j", "test") | |
val updateStmt = Cypher("UNWIND {updates} as update MATCH (p) where id(p) = update.id SET p.pagerank = update.rank") | |
val updates = part.map( v => Map("id"->v._1.toLong, "rank" -> v._2.toDouble)) | |
val count = updateStmt.on("updates"->updates).execute()(localConn) | |
Iterator(part.size) | |
}) | |
// res: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[103] at mapPartitions at <console>:42 | |
res.count | |
// res4: Long = 1000 | |
(System.currentTimeMillis - start)/1000/60 | |
// res5: Long = 7 minutes to write page-rank data back to Neo4j |
Hi,
I am trying to run this code on Spark, however, receiving the error:
org.anormcypher.Neo4jREST.type does not take parameters
Could you please help me out on this.
Thanks and regards,
Kavita
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note, if I run this in Neo4j as an extension, it takes 60-90 seconds, including reading the data, computing page-rank and writing the ranks back transactionally. :)