|
package LearningScala.LearningScala.local.kmeans |
|
|
|
object kmeans { |
|
|
|
// Find K Means of device status locations |
|
// |
|
// Input data: file(s) with device status data (delimited by ',') |
|
// including latitude (4th field) and longitude (5th field) of device locations |
|
// (lat,lon of 0,0 indicates unknown location) |
|
|
|
import scala.math.pow |
|
import org.apache.spark.SparkContext |
|
import org.apache.spark.SparkConf |
|
|
|
// The squared distances between two points |
|
def distanceSquared(p1: (Double, Double), p2: (Double, Double)) = { |
|
pow(p1._1 - p2._1, 2) + pow(p1._2 - p2._2, 2) |
|
} |
|
|
|
// The sum of two points |
|
def addPoints(p1: (Double, Double), p2: (Double, Double)) = { |
|
(p1._1 + p2._1, p1._2 + p2._2) |
|
} |
|
|
|
// for a point p and an array of points, return the index in the array of the point closest to p |
|
def closestPoint(p: (Double, Double), points: Array[(Double, Double)]): Int = { |
|
var index = 0 |
|
var bestIndex = 0 |
|
var closest = Double.PositiveInfinity |
|
|
|
for (i <- 0 until points.length) { |
|
val dist = distanceSquared(p, points(i)) |
|
if (dist < closest) { |
|
closest = dist |
|
bestIndex = i |
|
} |
|
} |
|
bestIndex |
|
} |
|
|
|
def main(args: Array[String]) { |
|
|
|
// Set Windows System property |
|
//System.setProperty("hadoop.home.dir", "c:/winutil/"); |
|
|
|
val conf = new SparkConf().setAppName("First Scala app").setMaster("local[*]") |
|
val sc = new SparkContext(conf) |
|
|
|
// The device status data file(s) |
|
val filename = "loudacre/*" |
|
|
|
// K is the number of means (center points of clusters) to find |
|
val K = 5 |
|
|
|
// ConvergeDist -- the threshold "distance" between iterations at which we decide we are done |
|
val convergeDist = .1 |
|
|
|
// Parse the device status data file into pairs |
|
|
|
val fileRdd = sc.textFile(filename) |
|
val pairLatLongRdd = fileRdd.map(line => line.split(',')).map(pair => (pair(3).toDouble, pair(4).toDouble)).filter(point => !((point._1 == 0) && (point._2 == 0))). |
|
persist() |
|
|
|
println(pairLatLongRdd.count()) |
|
|
|
for ((a, b) <- pairLatLongRdd.take(2)) { |
|
|
|
println("Lat: " + a + " Long : " + b); |
|
|
|
} |
|
|
|
//start with K randomly selected points from the dataset as center points |
|
|
|
var kPoints = pairLatLongRdd.takeSample(false, K, 42) |
|
|
|
println("K Center points initialized :"); |
|
|
|
for ((a, b) <- kPoints) { |
|
|
|
println("Lat: " + a + " Long : " + b); |
|
|
|
} |
|
|
|
// loop until the total distance between one iteration's points and the next is less than the convergence distance specified |
|
var tempDist = Double.PositiveInfinity |
|
|
|
while (tempDist > convergeDist) { |
|
|
|
// For each key (k-point index), find a new point by calculating the average of each closest point |
|
|
|
// for each point, find the index of the closest kpoint. |
|
// map to (index, (point,1)) as follow: |
|
// (1, ((2.1,-3.4),1)) |
|
// (0, ((5.1,-7.4),1)) |
|
// (1, ((8.1,-4.4),1)) |
|
val closestToKpointRdd = pairLatLongRdd.map(point => (closestPoint(point, kPoints), (point, 1))) |
|
|
|
// For each key (k-point index), reduce by sum (addPoints) the latitudes and longitudes of all the points closest to that k-point, and the number of closest points |
|
// E.g. |
|
// (1, ((4.325,-5.444),2314)) |
|
// (0, ((6.342,-7.532),4323)) |
|
// The reduced RDD should have at most K members. |
|
|
|
//val pointCalculatedRdd = closestToKpointRdd.reduceByKey((v1, v2) => ((addPoints(v1._1, v2._1), v1._2 + v2._2))) |
|
val pointCalculatedRdd = closestToKpointRdd.reduceByKey { case ((point1, n1), (point2, n2)) => (addPoints(point1, point2), n1 + n2) } |
|
|
|
// For each key (k-point index), find a new point by calculating the average of each closest point |
|
// (index, (totalX,totalY),n) to (index, (totalX/n,totalY/n)) |
|
|
|
//val newPointRdd = pointCalculatedRdd.map(center => (center._1, (center._2._1._1 / center._2._2, center._2._1._2 / center._2._2))).sortByKey() |
|
val newPoints = pointCalculatedRdd.map { case (i, (point, n)) => (i, (point._1 / n, point._2 / n)) }.collectAsMap() |
|
|
|
// calculate the total of the distance between the current points (kPoints) and new points (localAverageClosestPoint) |
|
|
|
tempDist = 0.0 |
|
|
|
for (i <- 0 until K) { |
|
// That distance is the delta between iterations. When delta is less than convergeDist, stop iterating |
|
tempDist += distanceSquared(kPoints(i), newPoints(i)) |
|
|
|
} |
|
|
|
println("Distance between iterations: " + tempDist); |
|
|
|
// Copy the new points to the kPoints array for the next iteration |
|
|
|
for (i <- 0 until K) { |
|
|
|
kPoints(i) = newPoints(i) |
|
|
|
} |
|
} |
|
|
|
// Display the final center points |
|
println("Final center points :"); |
|
|
|
for (point <- kPoints) { |
|
println(point); |
|
} |
|
|
|
// take 10 randomly selected device from the dataset and recall the model |
|
val deviceRdd = fileRdd.map(line => line.split(',')).map(pair => (pair(1), (pair(3).toDouble, pair(4).toDouble))).filter(device => !((device._2._1 == 0) && (device._2._2 == 0))). |
|
persist() |
|
|
|
var points = deviceRdd.takeSample(false, 10, 42) |
|
|
|
for ((device, point) <- points) { |
|
|
|
val k = closestPoint(point, kPoints) |
|
println("device: " + device + " to K: " + k); |
|
|
|
} |
|
|
|
sc.stop() |
|
} |
|
|
|
} |