Last active
January 29, 2016 01:06
-
-
Save RobColeman/31fb62c2ea7910b29f19 to your computer and use it in GitHub Desktop.
An Approximate Distribution wrapper class for TDunnings java TDigest library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.preact.platform.math.models | |
import java.lang.System._ | |
import java.util | |
import com.tdunning.math.stats.{Centroid, TreeDigest} | |
import org.apache.commons.math3.distribution.NormalDistribution | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.{SparkContext, SparkConf} | |
import scala.collection.immutable.IndexedSeq | |
object ApproximateDistribution { | |
val defaultCompression = 100.0 | |
val outputDistributionPoints = 100 | |
def apply(compression: Double = defaultCompression): ApproximateDistribution = new ApproximateDistribution(new TreeDigest(compression)) | |
def toApproximateDistribution(compression: Double = defaultCompression)(x: Double): ApproximateDistribution = { | |
val tD = ApproximateDistribution(compression) | |
tD.add(x) | |
tD | |
} | |
def toApproximateDistribution(compression: Double)(X: Array[Double]): ApproximateDistribution = { | |
val TD = ApproximateDistribution(compression) | |
TD | |
} | |
implicit class DoublesToApproximateDistribution(x: Double) { | |
def toApproximateDistribution(compression: Double = defaultCompression): ApproximateDistribution = { | |
val ap = ApproximateDistribution(compression) | |
ap.add(x) | |
ap | |
} | |
} | |
implicit class ArrayDoublesToApproximateDistribution(X: Array[Double]) { | |
def toApproximateDistribution(compression: Double = defaultCompression): ApproximateDistribution = { | |
val ap = ApproximateDistribution(compression) | |
X.foreach{x => ap.add(x)} | |
ap | |
} | |
} | |
} | |
class ApproximateDistribution(protected val digest: TreeDigest) { | |
var min: Double = Double.MaxValue | |
var max: Double = Double.MinValue | |
def size: Long = this.digest.size() | |
def updateMinMax(x: Double): Unit = { | |
this.min = Math.min(x, this.min) | |
this.max = Math.max(x, this.max) | |
} | |
def add(x: Double, w: Int = 1): Unit = { | |
this.updateMinMax(x) | |
this.digest.add(x, w) | |
} | |
def add(other: ApproximateDistribution): Unit = { | |
this.min = Math.min(other.min, this.min) | |
this.max = Math.max(other.max, this.max) | |
this.digest.add(other.digest) | |
} | |
def ++(other: ApproximateDistribution): ApproximateDistribution = { | |
this.add(other) | |
this | |
} | |
def cdf(x: Double): Double = this.digest.cdf(x) | |
def quantile(q: Double): Double = this.digest.quantile(q) | |
def centroids: util.Collection[Centroid] = this.digest.centroids() | |
def genCDF(points: Int = ApproximateDistribution.outputDistributionPoints, | |
// by default goes from min to max of seen data, but we can truncate it with these args | |
supportMin: Option[Double] = None, | |
supportMax: Option[Double] = None): Seq[(Double,Double)] = { | |
val mn = supportMin match { | |
case Some(sMn) => sMn | |
case None => this.min | |
} | |
val mx = supportMax match { | |
case Some(sMx) => sMx | |
case None => this.max | |
} | |
val stepSize: Double = (mx - mn) / points.toDouble | |
val support: Seq[Double] = (mn to mx by stepSize).toSeq | |
support.zip(support.map { x => this.digest.cdf(x) }) | |
} | |
def genPDF(points: Int = ApproximateDistribution.outputDistributionPoints, | |
// by default goes from min to max of seen data, but we can truncate it with these args | |
supportMin: Option[Double] = None, | |
supportMax: Option[Double] = None): Seq[(Double,Double)] = { | |
// differentiate left | |
val cdf = this.genCDF(points = points, supportMin = supportMin, supportMax = supportMax) | |
val pdf: Seq[(Double, Double)] = cdf.head +: cdf.sliding(2).map{ s => | |
val (l,r) = (s.head,s.last) | |
val x = r._1 | |
val pdf = r._2 - l._2 | |
(x, pdf) | |
}.toSeq | |
} | |
} | |
object ApproximateDistributionSparkKryoExample { | |
import ApproximateDistribution.{DoublesToApproximateDistribution,ArrayDoublesToApproximateDistribution} | |
def main(arg: Array[String]): Unit = { | |
val appName: String = "TDigest-Test" | |
val conf: SparkConf = new SparkConf().setAppName(appName).setMaster("local[16]") | |
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
conf.registerKryoClasses(Array(classOf[TreeDigest])) | |
val sc: SparkContext = new SparkContext(conf) | |
val trueDist0: NormalDistribution = new NormalDistribution(100.0, 15 ) | |
val trueDist2: NormalDistribution = new NormalDistribution(100.0, 15 ) | |
val compression = 300.0 | |
val data: IndexedSeq[Double] = (0 until 100000).map{ i => trueDist0.sample()} | |
val TDlocal: ApproximateDistribution = ApproximateDistribution(compression) | |
data.foreach{x => TDlocal.add(x, 1)} | |
val startTime = currentTimeMillis() | |
val dataRDD: RDD[Double] = sc.parallelize(data) | |
val TDspark = dataRDD.map{ _.toApproximateDistribution() }.reduce( _ ++ _ ) | |
val TDGlommedspark = dataRDD.glom().map{ _.toApproximateDistribution() }.reduce(_ ++ _) | |
val testX = 100.0 | |
println("Running TreeDigest in spark using Kryo") | |
println(s"Took ${(currentTimeMillis() - startTime) / 1000d} seconds.") | |
println(s"From spark : ${TDspark.cdf(testX)}") | |
println(s"From spark glommed : ${TDGlommedspark.cdf(testX)}") | |
println(s"From local : ${TDlocal.cdf(testX)}") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment