Last active
April 13, 2017 09:33
-
-
Save umbertogriffo/080a18a7f81346e57bd2aecded247f4d to your computer and use it in GitHub Desktop.
This Scala code tests the performance of Euclidean distance developed using map-reduce pattern, treeReduce and treeAggregate.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.commons.lang.SystemUtils | |
import org.apache.spark.mllib.random.RandomRDDs._ | |
import org.apache.spark.sql.SQLContext | |
import org.apache.spark.{SparkConf, SparkContext} | |
import scala.math.sqrt | |
/** | |
* Created by Umberto on 08/02/2017. | |
*/ | |
object TestPerformance { | |
def main(args: Array[String]) { | |
var mapReduceTimeArr : Array[Double]= Array.ofDim(20) | |
var treeReduceTimeArr : Array[Double]= Array.ofDim(20) | |
var treeAggregateTimeArr : Array[Double]= Array.ofDim(20) | |
// Set Windows System property | |
if (SystemUtils.IS_OS_WINDOWS) { | |
System.setProperty("hadoop.home.dir", "c:/winutil/") | |
} | |
// Spark setup | |
val config = new SparkConf().setAppName("TestStack").setMaster("local[*]") | |
val sc: SparkContext = new SparkContext(config) | |
val sql: SQLContext = new SQLContext(sc) | |
// Generate a random double RDD that contains 1 million i.i.d. values drawn from the | |
// standard normal distribution `N(0, 1)`, evenly distributed in 5 partitions. | |
val input1 = normalRDD(sc, 1000000L, 5) | |
// Generate a random double RDD that contains 1 million i.i.d. values drawn from the | |
// standard normal distribution `N(0, 1)`, evenly distributed in 5 partitions. | |
val input2 = normalRDD(sc, 1000000L, 5) | |
//val input1 = sc.parallelize(List[Double](1.0, 2.0, 3.0, 4.0, 5.0, 6.0)) | |
//val input2 = sc.parallelize(List[Double](1.0, 2.0, 3.0, 4.0, 3.0, 7.0)) | |
val xy = input1.zip(input2).cache() | |
// To materialize th RDD | |
xy.count() | |
for(i:Int <-0 until 20){ | |
val t1 = System.nanoTime() | |
val euclideanDistanceMapRed = sqrt(xy.map { case (v1, v2) => (v1 - v2) * (v1 - v2) }.reduce(_ + _)) | |
val t11 = System.nanoTime() | |
println("Map-Reduce - Euclidean Distance "+euclideanDistanceMapRed) | |
mapReduceTimeArr(i)=(t11 - t1)/1000000.0 | |
println("Map-Reduce - Elapsed time: " + (t11 - t1)/1000000.0 + "ms") | |
} | |
for(i:Int <-0 until 20) { | |
val t2 = System.nanoTime() | |
val euclideanDistanceTreeRed = sqrt(xy.map { case (v1, v2) => (v1 - v2) * (v1 - v2) }.treeReduce(_ + _)) | |
val t22 = System.nanoTime() | |
println("TreeReduce - Euclidean Distance "+euclideanDistanceTreeRed) | |
treeReduceTimeArr(i)=(t22 - t2) / 1000000.0 | |
println("TreeReduce - Elapsed time: " + (t22 - t2) / 1000000.0 + "ms") | |
} | |
for(i:Int <-0 until 20) { | |
val t3 = System.nanoTime() | |
val euclideanDistanceTreeAggr = sqrt(xy.treeAggregate(0.0)( | |
seqOp = (c, v) => { | |
(c + ((v._1 - v._2) * (v._1 - v._2))) | |
}, | |
combOp = (c1, c2) => { | |
(c1 + c2) | |
})) | |
val t33 = System.nanoTime() | |
println("TreeAggregate - Euclidean Distance " + euclideanDistanceTreeAggr) | |
treeAggregateTimeArr(i) = (t33 - t3) / 1000000.0 | |
println("TreeAggregate - Elapsed time: " + (t33 - t3) / 1000000.0 + "ms") | |
} | |
val mapReduceAvgTime = mapReduceTimeArr.sum / mapReduceTimeArr.length | |
val treeReduceAvgTime = treeReduceTimeArr.sum / treeReduceTimeArr.length | |
val treeAggregateAvgTime = treeAggregateTimeArr.sum / treeAggregateTimeArr.length | |
val mapReduceMinTime = mapReduceTimeArr.min | |
val treeReduceMinTime = treeReduceTimeArr.min | |
val treeAggregateMinTime = treeAggregateTimeArr.min | |
val mapReduceMaxTime = mapReduceTimeArr.max | |
val treeReduceMaxTime = treeReduceTimeArr.max | |
val treeAggregateMaxTime = treeAggregateTimeArr.max | |
println("Map-Reduce - Avg:" + mapReduceAvgTime+ "ms "+ "Max:" +mapReduceMaxTime+ "ms "+ "Min:" +mapReduceMinTime+ "ms ") | |
println("TreeReduce - Avg:" + treeReduceAvgTime + "ms "+ "Max:" +treeReduceMaxTime+ "ms "+ "Min:" +treeReduceMinTime+ "ms ") | |
println("TreeAggregate - Avg:" + treeAggregateAvgTime + "ms "+ "Max:" +treeAggregateMaxTime+ "ms "+ "Min:" +treeAggregateMinTime+ "ms ") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment