Last active
August 29, 2015 14:08
-
-
Save ankurdave/dc9f684b880d53ee162c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.graphx._ | |
import org.apache.spark.graphx.impl._ | |
import org.apache.spark._ | |
def time[A](desc: String)(f: => A): A = { | |
val start = System.currentTimeMillis | |
val result = f | |
println(s"$desc: ${System.currentTimeMillis - start} ms") | |
result | |
} | |
val builder = new FreshEdgePartitionBuilder[Double, Double] | |
val path = "/Users/ankurdave/Downloads/uk-2007-05-coalesced-part-00137" | |
val edges = time("load edges") { | |
scala.io.Source.fromFile(path).getLines.foreach { line => | |
if (!line.isEmpty && line(0) != '#') { | |
val lineArray = line.split("\\s+") | |
if (lineArray.length < 2) { | |
throw new Exception("Invalid line: " + line) | |
} | |
val srcId = lineArray(0).toLong | |
val dstId = lineArray(1).toLong | |
builder.add(srcId, dstId, 1.0) | |
} | |
} | |
} | |
val edgePartition: EdgePartition[Double, Double] = | |
time("finalize edge partition") { | |
builder.toEdgePartition | |
} | |
val tripletPartition: EdgePartition[Double, Double] = | |
time("upgrade edge partition") { | |
edgePartition.updateVertices(edgePartition.global2local.iterator.map(kv => (kv._1, 1.0))) | |
} | |
def mapFunc(e: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = | |
Iterator((e.dstId, e.srcAttr * e.attr)) | |
def reduceFunc(a: Double, b: Double) = a + b | |
time("preAgg") { | |
val aggregates = new Array[Double](tripletPartition.vertexAttrs.length) | |
val bitset = new org.apache.spark.util.collection.BitSet(tripletPartition.vertexAttrs.length) | |
val numEdges = tripletPartition.srcIds.length | |
var i = 0 | |
var edge = new EdgeTriplet[Double, Double] | |
while (i < numEdges) { | |
edge.localSrcId = tripletPartition.localSrcIds(i) | |
edge.localDstId = tripletPartition.localDstIds(i) | |
edge.srcId = tripletPartition.local2global(edge.localSrcId) | |
edge.dstId = tripletPartition.local2global(edge.localDstId) | |
edge.attr = tripletPartition.data(i) | |
edge.srcAttr = tripletPartition.vertexAttrs(edge.localSrcId) | |
mapFunc(edge).foreach { kv => | |
val globalId = kv._1 | |
val localId = if (globalId == edge.srcId) edge.localSrcId else edge.localDstId | |
val msg = kv._2 | |
if (bitset.get(localId)) { | |
aggregates(localId) = reduceFunc(aggregates(localId), msg) | |
} else { | |
aggregates(localId) = msg | |
bitset.set(localId) | |
} | |
} | |
i += 1 | |
} | |
bitset.iterator.map { localId => (edgePartition.local2global(localId), aggregates(localId)) }.foreach(x => {}) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.graphx._ | |
import org.apache.spark.graphx.impl._ | |
import org.apache.spark._ | |
def time[A](desc: String)(f: => A): A = { | |
val start = System.currentTimeMillis | |
val result = f | |
println(s"$desc: ${System.currentTimeMillis - start} ms") | |
result | |
} | |
val builder = new EdgePartitionBuilder[Double, Double] | |
val path = "/mnt/part-00005" | |
val edges = time("load edges") { | |
scala.io.Source.fromFile(path).getLines.foreach { line => | |
if (!line.isEmpty && line(0) != '#') { | |
val lineArray = line.split("\\s+") | |
if (lineArray.length < 2) { | |
throw new Exception("Invalid line: " + line) | |
} | |
val srcId = lineArray(0).toLong | |
val dstId = lineArray(1).toLong | |
builder.add(srcId, dstId, 1.0) | |
} | |
} | |
} | |
val edgePartition: EdgePartition[Double, Double] = | |
time("finalize edge partition") { | |
builder.toEdgePartition | |
} | |
val routingTablePartition = | |
time("build routing table") { | |
RoutingTablePartition.fromMsgs(1, RoutingTablePartition.edgePartitionToMsgs(0, edgePartition)) | |
} | |
val vertexPartition: ShippableVertexPartition[Double] = | |
time("build vertex partition") { | |
ShippableVertexPartition(Iterator.empty, routingTablePartition, 1.0) | |
} | |
val tripletPartition: EdgePartition[Double, Double] = | |
time("upgrade edge partition") { | |
edgePartition.updateVertices(vertexPartition.iterator) | |
} | |
def mapFunc(e: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = | |
Iterator((e.dstId, e.srcAttr * e.attr)) | |
def reduceFunc(a: Double, b: Double) = a + b | |
val result = | |
time("preAgg") { | |
val mapOutputs = tripletPartition.upgradeIterator(tripletPartition.iterator, true, false) | |
.flatMap(mapFunc(_)) | |
tripletPartition.vertices.aggregateUsingIndex(mapOutputs, reduceFunc).iterator | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <cstdlib> | |
#include <unordered_map> | |
#include <vector> | |
#include <ctime> | |
// Usage: | |
// g++ -Wall -O3 mrTriplets-localids.cpp -o mrTriplets-localids | |
// ./mrTriplets-localids 14167504 < ~/Downloads/uk-2007-05-coalesced-part-00137 | |
int main(int argc, char* argv[]) { | |
int num_edges = atoi(argv[1]); | |
std::vector<int> srcIds(num_edges); | |
std::vector<int> dstIds(num_edges); | |
std::vector<double> attrs(num_edges, 1.0); | |
std::unordered_map<long, int> vertex_index; | |
printf("Loading...\n"); | |
long src, dst; | |
int voffset = 0; | |
for (int i = 0; i < num_edges; i++) { | |
scanf("%ld\t%ld", &src, &dst); | |
if (vertex_index.find(src) == vertex_index.end()) { | |
vertex_index[src] = voffset; | |
srcIds[i] = voffset; | |
voffset++; | |
} else { | |
srcIds[i] = vertex_index[src]; | |
} | |
if (vertex_index.find(dst) == vertex_index.end()) { | |
vertex_index[dst] = voffset; | |
dstIds[i] = voffset; | |
voffset++; | |
} else { | |
dstIds[i] = vertex_index[dst]; | |
} | |
} | |
std::vector<double> vertex_attrs(voffset, 1.0); | |
std::vector<double> vertex_preagg(voffset, 0.0); | |
printf("Scanning...\n"); | |
clock_t start_time = clock(); | |
for (int i = 0; i < num_edges; i++) { | |
vertex_preagg[dstIds[i]] += vertex_attrs[srcIds[i]] * attrs[i];; | |
} | |
clock_t end_time = clock(); | |
printf("Scanned %d edges in %f seconds\n", | |
num_edges, (end_time - start_time) / static_cast<double>(CLOCKS_PER_SEC)); | |
return 0; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <cstdlib> | |
#include <unordered_map> | |
#include <vector> | |
#include <ctime> | |
// Usage: | |
// g++ -Wall -O3 mrTriplets.cpp -o mrTriplets | |
// ./mrTriplets 14167504 < ~/Downloads/uk-2007-05-coalesced-part-00137 | |
int main(int argc, char* argv[]) { | |
int num_edges = atoi(argv[1]); | |
std::vector<long> srcIds(num_edges); | |
std::vector<long> dstIds(num_edges); | |
std::vector<double> attrs(num_edges, 1.0); | |
std::unordered_map<long, double> vertices; | |
std::unordered_map<long, double> preagg; | |
printf("Loading...\n"); | |
long src, dst; | |
for (int i = 0; i < num_edges; i++) { | |
scanf("%ld\t%ld", &src, &dst); | |
srcIds[i] = src; | |
dstIds[i] = dst; | |
vertices[src] = 1.0; | |
vertices[dst] = 1.0; | |
} | |
printf("Scanning...\n"); | |
clock_t start_time = clock(); | |
for (int i = 0; i < num_edges; i++) { | |
double msg = vertices[srcIds[i]] * attrs[i]; | |
std::unordered_map<long, double>::const_iterator it = vertices.find(dstIds[i]); | |
if (it == vertices.end()) { // New key | |
vertices[dstIds[i]] = msg; | |
} else { // Existing key | |
vertices[dstIds[i]] += msg; | |
} | |
} | |
clock_t end_time = clock(); | |
printf("Scanned %d edges in %f seconds\n", | |
num_edges, (end_time - start_time) / static_cast<double>(CLOCKS_PER_SEC)); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment