Created
July 8, 2017 19:10
-
-
Save erikerlandson/7b59169320256b7ab1bcc6127d8b8d62 to your computer and use it in GitHub Desktop.
Demo using T-Digest UDAFs from pyspark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[eje@linux spark]$ ./bin/pyspark --jars /home/eje/git/isarn-sketches-spark/target/scala-2.11/isarn-sketches-spark-assembly-0.1.0.jar --driver-class-path /home/eje/git/isarn-sketches-spark/target/scala-2.11/isarn-sketches-spark-assembly-0.1.0.jar | |
Python 2.7.13 (default, May 10 2017, 20:04:28) | |
[GCC 6.3.1 20161221 (Red Hat 6.3.1-1)] on linux2 | |
Type "help", "copyright", "credits" or "license" for more information. | |
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties | |
Setting default log level to "WARN". | |
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). | |
17/07/08 12:04:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable | |
Welcome to | |
____ __ | |
/ __/__ ___ _____/ /__ | |
_\ \/ _ \/ _ `/ __/ '_/ | |
/__ / .__/\_,_/_/ /_/\_\ version 2.1.0 | |
/_/ | |
Using Python version 2.7.13 (default, May 10 2017 20:04:28) | |
SparkSession available as 'spark'. | |
>>> from pyspark.sql.column import Column, _to_java_column, _to_seq | |
>>> from pyspark.sql import Row | |
>>> def tdigest(col): | |
... asc = SparkContext._active_spark_context | |
... tdapply = asc._jvm.org.isarnproject.sketches.udaf.TDigestDoubleUDAF.apply | |
... return Column(tdapply(_to_seq(asc, [col], _to_java_column))) | |
... | |
>>> row = Row("x1","x2") | |
>>> df = spark.sparkContext.parallelize([row(1.0, 2.0),row(1.5,2.5),row(2.0,3.0)]).toDF() | |
>>> df.show() | |
+---+---+ | |
| x1| x2| | |
+---+---+ | |
|1.0|2.0| | |
|1.5|2.5| | |
|2.0|3.0| | |
+---+---+ | |
>>> df.agg(tdigest("x1")).show() | |
+----------------------+ | |
|tdigestdoubleudaf$(x1)| | |
+----------------------+ | |
| TDigestSQL(TDiges...| | |
+----------------------+ | |
>>> df.agg(tdigest("x2")).show() | |
+----------------------+ | |
|tdigestdoubleudaf$(x2)| | |
+----------------------+ | |
| TDigestSQL(TDiges...| | |
+----------------------+ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment