This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Spark code to find distances to reachable source vertices using GraphX. | |
// See http://apache-spark-user-list.1001560.n3.nabble.com/counting-degrees-graphx-td6370.html | |
import org.apache.spark.graphx._ | |
import scala.collection.immutable.Map | |
val vertexArray = Array( | |
(1L,("101","x")), | |
(2L,("102","y")), | |
(3L,("103","y")), |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.language.higherKinds | |
// Common interface of RDD and DStream. Note the Coll type parameter, which will either be RDD or DStream. | |
trait DistributedCollection[A, Coll[_]] { | |
def map[B](f: A => B): Coll[B] | |
} | |
class RDD[A](val x: A) extends DistributedCollection[A, RDD] { | |
def map[B](f: A => B): RDD[B] = new RDD(f(x)) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ankurdave@ankur-mbp-2:~/repos/spark$ ~/repos/spark/ec2/spark-ec2 -s 1 -k ankur-mbp-2 -i ~/.ssh/ankur-mbp-2.pem -t m3.medium -z us-east-1e launch test-cluster | |
Setting up security groups... | |
Creating security group test-cluster-master | |
Creating security group test-cluster-slaves | |
Searching for existing cluster test-cluster... | |
Spark AMI: ami-5bb18832 | |
Launching instances... | |
Launched 1 slaves in us-east-1e, regid = r-39c8dd48 | |
Launched master in us-east-1e, regid = r-79cedb08 | |
Waiting for instances to start up... |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Finds the connected component containing a particular vertex. | |
// In response to http://apache-spark-developers-list.1001551.n3.nabble.com/GraphX-some-vertex-with-specific-edge-td8436.html | |
import org.apache.spark.graphx._ | |
// Construct the graph in the above example | |
val edges = sc.parallelize(List( | |
Edge(1L, 2L, "e1"), Edge(2L, 3L, "e1"), Edge(3L, 4L, "e1"))) | |
val g: Graph[Int, String] = Graph.fromEdges(edges, 0) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
SPARK_JAVA_OPTS+=" -Dspark.local.dir=/mnt/spark,/mnt2/spark" | |
export SPARK_JAVA_OPTS | |
export SPARK_MEM=58g | |
# Standalone cluster options | |
export SPARK_MASTER_OPTS="" | |
export SPARK_WORKER_INSTANCES=1 | |
export SPARK_WORKER_CORES=8 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark._ | |
import org.apache.spark.graphx._ | |
val triplets = sc.textFile(path).flatMap { line => | |
if (!line.isEmpty && line(0) != '#') { | |
val lineArray = line.split("\\s+") | |
if (lineArray.length < 2) { | |
None | |
} else { | |
val t = new EdgeTriplet[String, String] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.graphx | |
import org.apache.spark.{Logging, SparkContext} | |
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl} | |
import org.apache.spark.storage.StorageLevel | |
object MyGraphLoader extends Logging { | |
def edgeListFile( | |
sc: SparkContext, | |
path: String, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.graphx._ | |
import org.apache.spark.graphx.impl._ | |
import org.apache.spark._ | |
def time[A](desc: String)(f: => A): A = { | |
val start = System.currentTimeMillis | |
val result = f | |
println(s"$desc: ${System.currentTimeMillis - start} ms") | |
result | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Launch the cluster | |
~/repos/spark/ec2/spark-ec2 -s 16 -w 500 -k ankur-mbp-2 -i ~/.ssh/ankur-mbp-2.pem -t r3.2xlarge -z us-east-1d --spot-price=1 launch graphx-16-r3.2xlarge | |
# After connecting to the cluster, run the following: | |
~/spark/sbin/stop-all.sh && cd /mnt && git clone https://github.com/ankurdave/spark -b edges-on-disk && cd /mnt/spark && mkdir -p conf && cp ~/spark/conf/* conf/ | |
echo "spark.core.connection.ack.wait.timeout 100000000" >> /mnt/spark/conf/spark-defaults.conf | |
echo "spark.storage.blockManagerSlaveTimeoutMs 100000000" >> /mnt/spark/conf/spark-defaults.conf | |
echo "spark.akka.timeout 100000" >> /mnt/spark/conf/spark-defaults.conf |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.graphx._ | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.sql.catalyst.expressions.Row | |
import scala.reflect.ClassTag | |
// Take edges and build a graph | |
def a(vertices: RDD[(Long, Row)], edges: RDD[(Long, Long)]): Graph[Row, Unit] = | |
Graph(vertices, edges.map(pair => Edge(pair._1, pair._2, Unit))) | |
// Run connected components on the graph |
OlderNewer