Created
October 28, 2014 19:32
-
-
Save ankurdave/0394d47809297eea76ff to your computer and use it in GitHub Desktop.
Modified version of GraphLoader for Spark 1.0 that allows setting the edge storage level
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.graphx | |
import org.apache.spark.{Logging, SparkContext} | |
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl} | |
import org.apache.spark.storage.StorageLevel | |
object MyGraphLoader extends Logging { | |
def edgeListFile( | |
sc: SparkContext, | |
path: String, | |
canonicalOrientation: Boolean = false, | |
minEdgePartitions: Int = 1, | |
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) | |
: Graph[Int, Int] = { | |
val startTime = System.currentTimeMillis | |
// Parse the edge data table directly into edge partitions | |
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions) | |
val edges = lines.mapPartitionsWithIndex { (pid, iter) => | |
val builder = new EdgePartitionBuilder[Int, Int] | |
iter.foreach { line => | |
if (!line.isEmpty && line(0) != '#') { | |
val lineArray = line.split("\\s+") | |
if (lineArray.length < 2) { | |
logWarning("Invalid line: " + line) | |
} | |
val srcId = lineArray(0).toLong | |
val dstId = lineArray(1).toLong | |
if (canonicalOrientation && srcId > dstId) { | |
builder.add(dstId, srcId, 1) | |
} else { | |
builder.add(srcId, dstId, 1) | |
} | |
} | |
} | |
Iterator((pid, builder.toEdgePartition)) | |
}.persist(storageLevel) | |
edges.count() | |
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) | |
val edgeRDD = new EdgeRDD(edges) | |
val vertexRDD = VertexRDD.fromEdges(edgeRDD, edgeRDD.partitions.size, defaultVal = 1) | |
GraphImpl.fromExistingRDDs(vertexRDD, edgeRDD) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment