Last active
March 26, 2019 16:09
-
-
Save RobColeman/7a5ebcb7c155c94b0a62 to your computer and use it in GitHub Desktop.
Helpers for TDunnings Java TDigest library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.preact.platform.math.models | |
import java.lang.System._ | |
import java.nio.ByteBuffer | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.{SparkContext, SparkConf} | |
import org.apache.commons.math3.distribution.ExponentialDistribution | |
import org.apache.commons.math3.distribution.NormalDistribution | |
import com.tdunning.math.stats.TreeDigest | |
import scala.collection.immutable.IndexedSeq | |
object TreeDigestHelper { | |
def apply(compression: Double): TreeDigest = new TreeDigest(compression) | |
def serialize(tdigest: TreeDigest): Array[Byte] = { | |
val arr = new Array[Byte](tdigest.byteSize) | |
tdigest.asBytes(ByteBuffer.wrap(arr)) | |
arr | |
} | |
def deserialize(arr: Array[Byte]): TreeDigest = { | |
TreeDigest.fromBytes(ByteBuffer.wrap(arr)) | |
} | |
def reduceMergeDigests(lBytes: Array[Byte], rBytes: Array[Byte]): Array[Byte] = { | |
val lTD = this.deserialize(lBytes) | |
val rTD = this.deserialize(rBytes) | |
lTD.add(rTD) | |
this.serialize(lTD) | |
} | |
def mapToDigest(compression: Double)(x: Double): Array[Byte] = { | |
val TD = new TreeDigest(compression) | |
TD.add(x, 1) | |
this.serialize(TD) | |
} | |
def mapArrayToDigest(compression: Double)(X: Seq[Double]): Array[Byte] = { | |
val TD = new TreeDigest(compression) | |
X.foreach{ x => TD.add(x, 1) } | |
this.serialize(TD) | |
} | |
def genCDF(supportMin: Double, supportMax: Double, points: Int = 100)(digest: TreeDigest): Seq[(Double,Double)] = { | |
val stepSize = (supportMax - supportMin) / points.toFloat | |
val support = (supportMin to supportMax by stepSize).toSeq | |
support.zip(support.map { x => digest.cdf(x) }) | |
} | |
def genPDF(supportMin: Double, supportMax: Double, points: Int = 100)(digest: TreeDigest): Seq[(Double,Double)] = { | |
val cdf = this.genCDF(supportMin, supportMax, points)(digest) | |
val pdf: Seq[(Double, Double)] = cdf.head +: cdf.sliding(2).map{ s => | |
val (l,r) = (s.head,s.last) | |
val x = r._1 | |
val pdf = r._2 - l._2 | |
(x, pdf) | |
}.toSeq | |
} | |
} | |
object TDigestExample { | |
def main(arg: Array[String]): Unit = { | |
val trueDist0: ExponentialDistribution = new ExponentialDistribution(15) | |
val trueDist1: ExponentialDistribution = new ExponentialDistribution(30) | |
val TD0: TreeDigest = new TreeDigest(25.0) | |
(0 until 10000).map{ i => trueDist0.sample()}.foreach{ x => TD0.add(x,1) } | |
val b: Array[Byte] = TreeDigestHelper.serialize(TD0) | |
val TD00: TreeDigest = TreeDigestHelper.deserialize(b) | |
println(TD0.cdf(15.0)) | |
println(TD00.cdf(15.0)) | |
} | |
} | |
object TDigestSparkExample { | |
def main(arg: Array[String]): Unit = { | |
val appName: String = "TDigest-Test" | |
val conf: SparkConf = new SparkConf().setAppName(appName).setMaster("local[16]") | |
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
conf.registerKryoClasses(Array(classOf[TreeDigest])) | |
val sc: SparkContext = new SparkContext(conf) | |
val trueDist0: ExponentialDistribution = new ExponentialDistribution(15) | |
val compression = 25.0 | |
val data = (0 until 100000).map{ i => trueDist0.sample()} | |
val TDlocal = new TreeDigest(compression) | |
data.foreach{x => TDlocal.add(x, 1)} | |
val startTime = currentTimeMillis() | |
val dataRDD: RDD[Double] = sc.parallelize(data) | |
val tdBytes: Array[Byte] = dataRDD.map(TreeDigestHelper.mapToDigest(compression)) | |
.reduce(TreeDigestHelper.reduceMergeDigests) | |
val TDspark = TreeDigestHelper.deserialize(tdBytes) | |
println(s"Took ${(currentTimeMillis() - startTime) / 1000d} seconds.") | |
println(s"From spark : ${TDspark.cdf(15.0)}") | |
println(s"From local : ${TDlocal.cdf(15.0)}") | |
} | |
} | |
object TDigestSparkKryoExample { | |
def main(arg: Array[String]): Unit = { | |
val appName: String = "TDigest-Test" | |
val conf: SparkConf = new SparkConf().setAppName(appName).setMaster("local[16]") | |
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
conf.registerKryoClasses(Array(classOf[TreeDigest])) | |
val sc: SparkContext = new SparkContext(conf) | |
val trueDist0: NormalDistribution = new NormalDistribution(100.0, 15 ) | |
val trueDist2: NormalDistribution = new NormalDistribution(100.0, 15 ) | |
val compression = 300.0 | |
val data: IndexedSeq[Double] = (0 until 100000).map{ i => trueDist0.sample()} | |
val TDlocal = new TreeDigest(compression) | |
data.foreach{x => TDlocal.add(x, 1)} | |
val startTime = currentTimeMillis() | |
val dataRDD: RDD[Double] = sc.parallelize(data) | |
val TDspark = dataRDD.map(toTD(compression)).reduce(reduceTD) | |
val TDGlommedspark = dataRDD.glom().map(arrayToTD(compression)).reduce(reduceTD) | |
val testX = 100.0 | |
println("Running TreeDigest in spark using Kryo") | |
println(s"Took ${(currentTimeMillis() - startTime) / 1000d} seconds.") | |
println(s"From spark : ${TDspark.cdf(testX)}") | |
println(s"From spark glommed : ${TDGlommedspark.cdf(testX)}") | |
println(s"From local : ${TDlocal.cdf(testX)}") | |
} | |
def reduceTD(l: TreeDigest, r: TreeDigest): TreeDigest = { | |
l.add(r) | |
l | |
} | |
def toTD(compression: Double)(x: Double): TreeDigest = { | |
val TD = new TreeDigest(compression) | |
TD.add(x) | |
TD | |
} | |
def arrayToTD(compression: Double)(X: Array[Double]): TreeDigest = { | |
val TD = new TreeDigest(compression) | |
X.foreach{x => TD.add(x)} | |
TD | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment