Skip to content

Instantly share code, notes, and snippets.

@erikerlandson
Created July 8, 2017 19:10
Show Gist options
  • Save erikerlandson/7b59169320256b7ab1bcc6127d8b8d62 to your computer and use it in GitHub Desktop.
Save erikerlandson/7b59169320256b7ab1bcc6127d8b8d62 to your computer and use it in GitHub Desktop.
Demo using T-Digest UDAFs from pyspark
[eje@linux spark]$ ./bin/pyspark --jars /home/eje/git/isarn-sketches-spark/target/scala-2.11/isarn-sketches-spark-assembly-0.1.0.jar --driver-class-path /home/eje/git/isarn-sketches-spark/target/scala-2.11/isarn-sketches-spark-assembly-0.1.0.jar
Python 2.7.13 (default, May 10 2017, 20:04:28)
[GCC 6.3.1 20161221 (Red Hat 6.3.1-1)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/07/08 12:04:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Python version 2.7.13 (default, May 10 2017 20:04:28)
SparkSession available as 'spark'.
>>> from pyspark.sql.column import Column, _to_java_column, _to_seq
>>> from pyspark.sql import Row
>>> def tdigest(col):
... asc = SparkContext._active_spark_context
... tdapply = asc._jvm.org.isarnproject.sketches.udaf.TDigestDoubleUDAF.apply
... return Column(tdapply(_to_seq(asc, [col], _to_java_column)))
...
>>> row = Row("x1","x2")
>>> df = spark.sparkContext.parallelize([row(1.0, 2.0),row(1.5,2.5),row(2.0,3.0)]).toDF()
>>> df.show()
+---+---+
| x1| x2|
+---+---+
|1.0|2.0|
|1.5|2.5|
|2.0|3.0|
+---+---+
>>> df.agg(tdigest("x1")).show()
+----------------------+
|tdigestdoubleudaf$(x1)|
+----------------------+
| TDigestSQL(TDiges...|
+----------------------+
>>> df.agg(tdigest("x2")).show()
+----------------------+
|tdigestdoubleudaf$(x2)|
+----------------------+
| TDigestSQL(TDiges...|
+----------------------+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment