-
-
Save BryanCutler/0b0c820c1beb5ffc40618c462912195f to your computer and use it in GitHub Desktop.
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# PySpark Vectorized UDFs using Arrow\n", | |
"Using Arrow, it is possible to perform vectorized evaluation of Python UDFs that will accept one or more `Pandas.Series` as input and return a single `Pandas.Series` of equal length. Using vectorized functions will offer a performance boost over the current way PySpark evaluates using a loop that iterates over 1 row at a time.\n", | |
"\n", | |
"The following assumes you have started a Jupyter Notebook with a PySpark kernel that creates a default SparkSession `spark`. For a quick method to start this up, see [here](https://gist.github.com/BryanCutler/b7f10167c4face19e03330a07b24ce21).\n", | |
"\n", | |
"\n", | |
"## Where to get it\n", | |
"This functionality is currently pending review and has not yet been merged into Spark, see [SPARK-21404](https://issues.apache.org/jira/browse/SPARK-21404). Until then, a patch for this can be downloaded from the branch in the PR [here](https://patch-diff.githubusercontent.com/raw/apache/spark/pull/18659.diff)." | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## PySpark API\n", | |
"A new API has been added in pyspark to declare a vectorized UDF. As with normal UDFs you can wrap a function or use a decorator:\n", | |
"\n", | |
"```python\n", | |
"# Wrap the function \"func\"\n", | |
"pandas_udf(func, DoubleType())\n", | |
"\n", | |
"# Use a decorator\n", | |
"@pandas(returnType=DoubleType())\n", | |
"def func(x):\n", | |
" # do something with \"x\" (pandas.Series) and return \"y\" (also a pandas.Series)\n", | |
" return y\n", | |
"```" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Example Usage\n", | |
"Let's go through a simple example with first evaluating a UDF without vectorization, then the same UDF with vectorization enabled. This will define some sample data:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"from pyspark.sql.functions import col, udf, mean, rand\n", | |
"from pyspark.sql.types import *\n", | |
"\n", | |
"df = spark.range(1 << 24, numPartitions=16).toDF(\"id\") \\\n", | |
" .withColumn(\"p1\", rand()).withColumn(\"p2\", rand())" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### First define the function *without vectorization*" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"from math import log, exp\n", | |
"\n", | |
"def my_func(p1, p2):\n", | |
" w = 0.5\n", | |
" return exp(log(p1) + log(p2) - log(w))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"and evaluate it as a UDF (using `filter()` to force evaluation)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"1 loop, best of 3: 13.6 s per loop\n" | |
] | |
} | |
], | |
"source": [ | |
"my_udf = udf(my_func, DoubleType())\n", | |
"\n", | |
"result = df.withColumn(\"p\", my_udf(col(\"p1\"), col(\"p2\")))\n", | |
"\n", | |
"%timeit result.filter(\"p < 1.0\").count()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Now define the function *with vectorization*" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"from numpy import log, exp\n", | |
"\n", | |
"def my_func(p1, p2):\n", | |
" w = 0.5\n", | |
" return exp(log(p1) + log(p2) - log(w))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"and evaluate the UDF again, this time making use of Arrow to evaluate `my_func` with `p1` and `p2` as `Pandas.Series`, which will also cause the expression to return a `Pandas.Series` of the same size.\n", | |
"\n", | |
"NOTE: Spark will not accept Numpy types as return values, which is why we need to redefine the function. This is an known issue from [SPARK-12157](https://issues.apache.org/jira/browse/SPARK-12157)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"1 loop, best of 3: 2 s per loop\n" | |
] | |
} | |
], | |
"source": [ | |
"from pyspark.sql.functions import pandas_udf\n", | |
"\n", | |
"my_udf = pandas_udf(my_func, DoubleType())\n", | |
"\n", | |
"result = df.withColumn(\"p\", my_udf(col(\"p1\"), col(\"p2\")))\n", | |
"\n", | |
"%timeit result.filter(\"p < 1.0\").count()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": { | |
"collapsed": true | |
}, | |
"source": [ | |
"## Make better use of Pandas and Numpy\n", | |
"\n", | |
"Since the inputs to your UDF are `Pandas.Series`, you can use Pandas and Numpy operations on the data and also return a series or numpy array. For example, say we want to draw samples from a random distribution for data points with a specific label." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": { | |
"scrolled": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+-----+------+\n", | |
"|label|counts|\n", | |
"+-----+------+\n", | |
"| 0| 7.0|\n", | |
"| 1| 0.0|\n", | |
"| 2| 0.0|\n", | |
"| 0| 8.0|\n", | |
"| 1| 0.0|\n", | |
"| 2| 0.0|\n", | |
"| 0| 7.0|\n", | |
"+-----+------+\n", | |
"only showing top 7 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"import numpy as np\n", | |
"import pandas as pd\n", | |
"\n", | |
"df = spark.range(1 << 20).toDF(\"id\") \\\n", | |
" .selectExpr(\"(id % 3) AS label\")\n", | |
"\n", | |
"def sample(label):\n", | |
" \"\"\" \n", | |
" Sample selected data from a Poisson distribution\n", | |
" :param label: Pandas.Series of data labels\n", | |
" \"\"\"\n", | |
"\n", | |
" # use numpy to initialze an empty array\n", | |
" p = pd.Series(np.zeros(len(label)))\n", | |
"\n", | |
" # use pandas to select data matching label \"0\"\n", | |
" idx0 = label == 0\n", | |
"\n", | |
" # sample from numpy and assign to the selected data\n", | |
" p[idx0] = np.random.poisson(7, len(idx0))\n", | |
"\n", | |
" # return the pandas series\n", | |
" return p\n", | |
"\n", | |
"sample_udf = pandas_udf(sample, DoubleType())\n", | |
"\n", | |
"result = df.withColumn(\"counts\", sample_udf(col(\"label\")))\n", | |
"result.show(n=7)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 2", | |
"language": "python", | |
"name": "python2" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 2 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython2", | |
"version": "2.7.13" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Hi Bryan,
I have been able to make pandas_udf's work using an ipython notebook, but this does not work with spark-submit
from pyspark.sql.functions import PandasUDFType
@pandas_udf("id int",PandasUDFType.GROUPED_MAP)
def featurize_udf(dfp):
return pd.DataFrame({'id':[1,2]})
gdf = df.groupBy('username').apply(featurize_udf)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:333)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:322)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:177)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:158)
... 24 more
is this ready for production?
Looks like the Jira ticket is closed and this is part of Spark 2.3, thanks!