Skip to content

Instantly share code, notes, and snippets.

@ikwattro
Forked from jexp/spark-neo4j-pagerank.scala
Created October 14, 2015 09:37
Show Gist options
  • Select an option

  • Save ikwattro/1cfe6845c26555688d04 to your computer and use it in GitHub Desktop.

Select an option

Save ikwattro/1cfe6845c26555688d04 to your computer and use it in GitHub Desktop.
DBPedia in Neo4j -> Read from Neo4j -> Run PageRank (5 iterations) -> Write back to Neo4j
// running spark on a large single machine
// 6 workers, with 12G RAM each -> 72G total and 8G for the driver -> 80G RAM in total
// the machine has 6 physical CPUs
// the jar contains just AnormCypher.org + Dependencies
neo@neo:/tmp/spark$ bin/spark-shell --jars ../neo4j/target/scala-2.10/Neo4j-Spark-Demo-assembly-1.0.jar --driver-memory 8G --executor-memory 12G --master local[6]
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.
import org.anormcypher._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
var start = System.currentTimeMillis
val total = 100000000
val batch = total/1000000
// batch: Int = 100
// read 100 batches of 100k rows of relationships with skip <window> limit 100k
// each otherwise Spark runs into OOM keeping 100M rows at once
val links = sc.range(0,batch).repartition(batch).mapPartitionsWithIndex( (i,p) => {
val dbConn = Neo4jREST("localhost", 9474, "/db/data/", "neo4j", "test")
val q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1) as from, id(p2) as to skip {skip} limit 1000000"
p.flatMap( skip => {
Cypher(q).on("skip"->skip*1000000).apply()(dbConn).map(row =>
(row[Int]("from").toLong,row[Int]("to").toLong)
)
})
})
// links: org.apache.spark.rdd.RDD[(Long, Long)] = MapPartitionsRDD[6] at mapPartitionsWithIndex at <console>:34
links.cache
// res0: links.type = MapPartitionsRDD[6] at mapPartitionsWithIndex at <console>:34
links.count
// res1: Long = 100000000 - 100M entries
(System.currentTimeMillis - start)/1000/60
// res2: Long = 9 minutes to read 100M rows in 100 batches
// would be faster as a single big read but spark chokes on that
start = System.currentTimeMillis
// convert tuples into Edge's
val edges = links.map( l => Edge(l._1,l._2, None))
// edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[None.type]] = MapPartitionsRDD[7] at map at <console>:36
// create GraphX Graph
val g = Graph.fromEdges(edges,"none")
// g: org.apache.spark.graphx.Graph[String,None.type] = org.apache.spark.graphx.impl.GraphImpl@58d5bdb0
// alternative pageRank invocation
// val v = g.pageRank(.0001).vertices
// run 5 iterations of pagerank
val v = PageRank.run(g, 5).vertices
// v: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[92] at RDD at VertexRDD.scala:57
(System.currentTimeMillis - start)/1000/60
// res3: Long = 122 minutes to run pagerank on GraphX on 100M edges
start = System.currentTimeMillis
// partition the results into 1000 partitions to write data back concurrently to Neo4j
val res = v.repartition(total/100000).mapPartitions( part => {
val localConn = Neo4jREST("localhost", 9474, "/db/data/", "neo4j", "test")
val updateStmt = Cypher("UNWIND {updates} as update MATCH (p) where id(p) = update.id SET p.pagerank = update.rank")
val updates = part.map( v => Map("id"->v._1.toLong, "rank" -> v._2.toDouble))
val count = updateStmt.on("updates"->updates).execute()(localConn)
Iterator(part.size)
})
// res: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[103] at mapPartitions at <console>:42
res.count
// res4: Long = 1000
(System.currentTimeMillis - start)/1000/60
// res5: Long = 7 minutes to write page-rank data back to Neo4j
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment