| 
          package LearningScala.LearningScala.local.kmeans | 
        
        
           | 
          
 | 
        
        
           | 
          object kmeans { | 
        
        
           | 
          
 | 
        
        
           | 
            // Find K Means of device status locations | 
        
        
           | 
            //  | 
        
        
           | 
            // Input data: file(s) with device status data (delimited by ',') | 
        
        
           | 
            // including latitude (4th field) and longitude (5th field) of device locations  | 
        
        
           | 
            // (lat,lon of 0,0 indicates unknown location) | 
        
        
           | 
          
 | 
        
        
           | 
            import scala.math.pow | 
        
        
           | 
            import org.apache.spark.SparkContext | 
        
        
           | 
            import org.apache.spark.SparkConf | 
        
        
           | 
          
 | 
        
        
           | 
            // The squared distances between two points | 
        
        
           | 
            def distanceSquared(p1: (Double, Double), p2: (Double, Double)) = { | 
        
        
           | 
              pow(p1._1 - p2._1, 2) + pow(p1._2 - p2._2, 2) | 
        
        
           | 
            } | 
        
        
           | 
          
 | 
        
        
           | 
            // The sum of two points | 
        
        
           | 
            def addPoints(p1: (Double, Double), p2: (Double, Double)) = { | 
        
        
           | 
              (p1._1 + p2._1, p1._2 + p2._2) | 
        
        
           | 
            } | 
        
        
           | 
          
 | 
        
        
           | 
            // for a point p and an array of points, return the index in the array of the point closest to p | 
        
        
           | 
            def closestPoint(p: (Double, Double), points: Array[(Double, Double)]): Int = { | 
        
        
           | 
              var index = 0 | 
        
        
           | 
              var bestIndex = 0 | 
        
        
           | 
              var closest = Double.PositiveInfinity | 
        
        
           | 
          
 | 
        
        
           | 
              for (i <- 0 until points.length) { | 
        
        
           | 
                val dist = distanceSquared(p, points(i)) | 
        
        
           | 
                if (dist < closest) { | 
        
        
           | 
                  closest = dist | 
        
        
           | 
                  bestIndex = i | 
        
        
           | 
                } | 
        
        
           | 
              } | 
        
        
           | 
              bestIndex | 
        
        
           | 
            } | 
        
        
           | 
          
 | 
        
        
           | 
            def main(args: Array[String]) { | 
        
        
           | 
          
 | 
        
        
           | 
              // Set Windows System property | 
        
        
           | 
              //System.setProperty("hadoop.home.dir", "c:/winutil/"); | 
        
        
           | 
          
 | 
        
        
           | 
              val conf = new SparkConf().setAppName("First Scala app").setMaster("local[*]") | 
        
        
           | 
              val sc = new SparkContext(conf) | 
        
        
           | 
          
 | 
        
        
           | 
              // The device status data file(s) | 
        
        
           | 
              val filename = "loudacre/*" | 
        
        
           | 
          
 | 
        
        
           | 
              // K is the number of means (center points of clusters) to find | 
        
        
           | 
              val K = 5 | 
        
        
           | 
          
 | 
        
        
           | 
              // ConvergeDist -- the threshold "distance" between iterations at which we decide we are done | 
        
        
           | 
              val convergeDist = .1 | 
        
        
           | 
          
 | 
        
        
           | 
              // Parse the device status data file into pairs | 
        
        
           | 
          
 | 
        
        
           | 
              val fileRdd = sc.textFile(filename) | 
        
        
           | 
              val pairLatLongRdd = fileRdd.map(line => line.split(',')).map(pair => (pair(3).toDouble, pair(4).toDouble)).filter(point => !((point._1 == 0) && (point._2 == 0))). | 
        
        
           | 
                persist() | 
        
        
           | 
          
 | 
        
        
           | 
              println(pairLatLongRdd.count()) | 
        
        
           | 
          
 | 
        
        
           | 
              for ((a, b) <- pairLatLongRdd.take(2)) { | 
        
        
           | 
          
 | 
        
        
           | 
                println("Lat: " + a + " Long : " + b); | 
        
        
           | 
          
 | 
        
        
           | 
              } | 
        
        
           | 
          
 | 
        
        
           | 
              //start with K randomly selected points from the dataset as center points | 
        
        
           | 
          
 | 
        
        
           | 
              var kPoints = pairLatLongRdd.takeSample(false, K, 42) | 
        
        
           | 
               | 
        
        
           | 
              println("K Center points initialized :"); | 
        
        
           | 
          
 | 
        
        
           | 
              for ((a, b) <- kPoints) { | 
        
        
           | 
          
 | 
        
        
           | 
                println("Lat: " + a + " Long : " + b); | 
        
        
           | 
          
 | 
        
        
           | 
              } | 
        
        
           | 
          
 | 
        
        
           | 
              // loop until the total distance between one iteration's points and the next is less than the convergence distance specified | 
        
        
           | 
              var tempDist = Double.PositiveInfinity | 
        
        
           | 
          
 | 
        
        
           | 
              while (tempDist > convergeDist) { | 
        
        
           | 
          
 | 
        
        
           | 
                // For each key (k-point index), find a new point by calculating the average of each closest point | 
        
        
           | 
          
 | 
        
        
           | 
                // for each point, find the index of the closest kpoint.  | 
        
        
           | 
                // map to (index, (point,1)) as follow: | 
        
        
           | 
                // (1, ((2.1,-3.4),1)) | 
        
        
           | 
                // (0, ((5.1,-7.4),1)) | 
        
        
           | 
                // (1, ((8.1,-4.4),1))  | 
        
        
           | 
                val closestToKpointRdd = pairLatLongRdd.map(point => (closestPoint(point, kPoints), (point, 1))) | 
        
        
           | 
          
 | 
        
        
           | 
                // For each key (k-point index), reduce by sum (addPoints) the latitudes and longitudes of all the points closest to that k-point, and the number of closest points | 
        
        
           | 
                // E.g. | 
        
        
           | 
                // (1, ((4.325,-5.444),2314)) | 
        
        
           | 
                // (0, ((6.342,-7.532),4323)) | 
        
        
           | 
                // The reduced RDD should have at most K members. | 
        
        
           | 
          
 | 
        
        
           | 
                //val pointCalculatedRdd = closestToKpointRdd.reduceByKey((v1, v2) => ((addPoints(v1._1, v2._1), v1._2 + v2._2))) | 
        
        
           | 
                val pointCalculatedRdd = closestToKpointRdd.reduceByKey { case ((point1, n1), (point2, n2)) => (addPoints(point1, point2), n1 + n2) } | 
        
        
           | 
          
 | 
        
        
           | 
                // For each key (k-point index), find a new point by calculating the average of each closest point | 
        
        
           | 
                // (index, (totalX,totalY),n) to (index, (totalX/n,totalY/n)) | 
        
        
           | 
          
 | 
        
        
           | 
                //val newPointRdd = pointCalculatedRdd.map(center => (center._1, (center._2._1._1 / center._2._2, center._2._1._2 / center._2._2))).sortByKey() | 
        
        
           | 
                val newPoints = pointCalculatedRdd.map { case (i, (point, n)) => (i, (point._1 / n, point._2 / n)) }.collectAsMap() | 
        
        
           | 
          
 | 
        
        
           | 
                // calculate the total of the distance between the current points (kPoints) and new points (localAverageClosestPoint) | 
        
        
           | 
          
 | 
        
        
           | 
                tempDist = 0.0 | 
        
        
           | 
          
 | 
        
        
           | 
                for (i <- 0 until K) { | 
        
        
           | 
                  // That distance is the delta between iterations. When delta is less than convergeDist, stop iterating | 
        
        
           | 
                  tempDist += distanceSquared(kPoints(i), newPoints(i)) | 
        
        
           | 
          
 | 
        
        
           | 
                } | 
        
        
           | 
          
 | 
        
        
           | 
                println("Distance between iterations: " + tempDist); | 
        
        
           | 
          
 | 
        
        
           | 
                // Copy the new points to the kPoints array for the next iteration | 
        
        
           | 
          
 | 
        
        
           | 
                for (i <- 0 until K) { | 
        
        
           | 
          
 | 
        
        
           | 
                  kPoints(i) = newPoints(i) | 
        
        
           | 
          
 | 
        
        
           | 
                } | 
        
        
           | 
              } | 
        
        
           | 
          
 | 
        
        
           | 
              // Display the final center points        | 
        
        
           | 
              println("Final center points :"); | 
        
        
           | 
          
 | 
        
        
           | 
              for (point <- kPoints) { | 
        
        
           | 
                println(point); | 
        
        
           | 
              } | 
        
        
           | 
          
 | 
        
        
           | 
              // take 10 randomly selected device from the dataset and recall the model | 
        
        
           | 
              val deviceRdd = fileRdd.map(line => line.split(',')).map(pair => (pair(1), (pair(3).toDouble, pair(4).toDouble))).filter(device => !((device._2._1 == 0) && (device._2._2 == 0))). | 
        
        
           | 
                persist() | 
        
        
           | 
          
 | 
        
        
           | 
              var points = deviceRdd.takeSample(false, 10, 42) | 
        
        
           | 
          
 | 
        
        
           | 
              for ((device, point) <- points) { | 
        
        
           | 
          
 | 
        
        
           | 
                val k = closestPoint(point, kPoints) | 
        
        
           | 
                println("device: " + device + " to K: " + k); | 
        
        
           | 
          
 | 
        
        
           | 
              } | 
        
        
           | 
          
 | 
        
        
           | 
              sc.stop() | 
        
        
           | 
            } | 
        
        
           | 
          
 | 
        
        
           | 
          } |