Skip to content

Instantly share code, notes, and snippets.

@robenalt
Forked from libratiger/kmeans.scala
Created August 21, 2014 14:26
Show Gist options
  • Save robenalt/df2713e08cbc5acf61fb to your computer and use it in GitHub Desktop.
Save robenalt/df2713e08cbc5acf61fb to your computer and use it in GitHub Desktop.
package mllib
import scala.util.Random
import org.jblas.DoubleMatrix
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.Logging
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.clustering.KMeans
object MyKmeans {
def main(args: Array[String]) {
val sparkMaster = "spark://192.168.35.27:7077"
val input_path = "hdfs://192.168.35.10:54310/kmeans/data101"
val nexamples = 1000
val nfeatures = 100000
val parts = 100
val conf = new SparkConf()
conf.setMaster(sparkMaster)
conf.setAppName("Kmeans")
conf.setJars(Seq("target/scala-2.10/tinyfish_2.10-1.0.jar"))
conf.set("spark.worker.timeout", "300")
conf.set("spark.akka.frameSize", "200")
conf.set("spark.executor.memory", "50g")
conf.set("spark.storage.memoryFraction", "0.4")
conf.set("spark.akka.threads", "42")
conf.set("spark.shuffle.consolidateFiles", "true")
val sc = new SparkContext(conf)
val data = sc.parallelize(0 until nexamples, parts).map { idx =>
val rnd = new Random(42 + idx)
val x = Array.fill[Double](nfeatures) {
rnd.nextDouble() * 2.0 - 1.0
}
Vectors.dense(x)
}.cache()
val cnt = data.count()
print("cnt" + cnt)
val numIterations = 3
val numClusters = 10
val clusters = KMeans.train(data, numClusters, numIterations)
println("finished")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment