-
-
Save BryanCutler/0b0c820c1beb5ffc40618c462912195f to your computer and use it in GitHub Desktop.
| { | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "# PySpark Vectorized UDFs using Arrow\n", | |
| "Using Arrow, it is possible to perform vectorized evaluation of Python UDFs that will accept one or more `Pandas.Series` as input and return a single `Pandas.Series` of equal length. Using vectorized functions will offer a performance boost over the current way PySpark evaluates using a loop that iterates over 1 row at a time.\n", | |
| "\n", | |
| "The following assumes you have started a Jupyter Notebook with a PySpark kernel that creates a default SparkSession `spark`. For a quick method to start this up, see [here](https://gist.github.com/BryanCutler/b7f10167c4face19e03330a07b24ce21).\n", | |
| "\n", | |
| "\n", | |
| "## Where to get it\n", | |
| "This functionality is currently pending review and has not yet been merged into Spark, see [SPARK-21404](https://issues.apache.org/jira/browse/SPARK-21404). Until then, a patch for this can be downloaded from the branch in the PR [here](https://patch-diff.githubusercontent.com/raw/apache/spark/pull/18659.diff)." | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## PySpark API\n", | |
| "A new API has been added in pyspark to declare a vectorized UDF. As with normal UDFs you can wrap a function or use a decorator:\n", | |
| "\n", | |
| "```python\n", | |
| "# Wrap the function \"func\"\n", | |
| "pandas_udf(func, DoubleType())\n", | |
| "\n", | |
| "# Use a decorator\n", | |
| "@pandas(returnType=DoubleType())\n", | |
| "def func(x):\n", | |
| " # do something with \"x\" (pandas.Series) and return \"y\" (also a pandas.Series)\n", | |
| " return y\n", | |
| "```" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Example Usage\n", | |
| "Let's go through a simple example with first evaluating a UDF without vectorization, then the same UDF with vectorization enabled. This will define some sample data:" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 1, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "from pyspark.sql.functions import col, udf, mean, rand\n", | |
| "from pyspark.sql.types import *\n", | |
| "\n", | |
| "df = spark.range(1 << 24, numPartitions=16).toDF(\"id\") \\\n", | |
| " .withColumn(\"p1\", rand()).withColumn(\"p2\", rand())" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "### First define the function *without vectorization*" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 2, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "from math import log, exp\n", | |
| "\n", | |
| "def my_func(p1, p2):\n", | |
| " w = 0.5\n", | |
| " return exp(log(p1) + log(p2) - log(w))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "and evaluate it as a UDF (using `filter()` to force evaluation)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 3, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "1 loop, best of 3: 13.6 s per loop\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "my_udf = udf(my_func, DoubleType())\n", | |
| "\n", | |
| "result = df.withColumn(\"p\", my_udf(col(\"p1\"), col(\"p2\")))\n", | |
| "\n", | |
| "%timeit result.filter(\"p < 1.0\").count()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "### Now define the function *with vectorization*" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 4, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "from numpy import log, exp\n", | |
| "\n", | |
| "def my_func(p1, p2):\n", | |
| " w = 0.5\n", | |
| " return exp(log(p1) + log(p2) - log(w))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "and evaluate the UDF again, this time making use of Arrow to evaluate `my_func` with `p1` and `p2` as `Pandas.Series`, which will also cause the expression to return a `Pandas.Series` of the same size.\n", | |
| "\n", | |
| "NOTE: Spark will not accept Numpy types as return values, which is why we need to redefine the function. This is an known issue from [SPARK-12157](https://issues.apache.org/jira/browse/SPARK-12157)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 5, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "1 loop, best of 3: 2 s per loop\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "from pyspark.sql.functions import pandas_udf\n", | |
| "\n", | |
| "my_udf = pandas_udf(my_func, DoubleType())\n", | |
| "\n", | |
| "result = df.withColumn(\"p\", my_udf(col(\"p1\"), col(\"p2\")))\n", | |
| "\n", | |
| "%timeit result.filter(\"p < 1.0\").count()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "source": [ | |
| "## Make better use of Pandas and Numpy\n", | |
| "\n", | |
| "Since the inputs to your UDF are `Pandas.Series`, you can use Pandas and Numpy operations on the data and also return a series or numpy array. For example, say we want to draw samples from a random distribution for data points with a specific label." | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 6, | |
| "metadata": { | |
| "scrolled": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "+-----+------+\n", | |
| "|label|counts|\n", | |
| "+-----+------+\n", | |
| "| 0| 7.0|\n", | |
| "| 1| 0.0|\n", | |
| "| 2| 0.0|\n", | |
| "| 0| 8.0|\n", | |
| "| 1| 0.0|\n", | |
| "| 2| 0.0|\n", | |
| "| 0| 7.0|\n", | |
| "+-----+------+\n", | |
| "only showing top 7 rows\n", | |
| "\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "import numpy as np\n", | |
| "import pandas as pd\n", | |
| "\n", | |
| "df = spark.range(1 << 20).toDF(\"id\") \\\n", | |
| " .selectExpr(\"(id % 3) AS label\")\n", | |
| "\n", | |
| "def sample(label):\n", | |
| " \"\"\" \n", | |
| " Sample selected data from a Poisson distribution\n", | |
| " :param label: Pandas.Series of data labels\n", | |
| " \"\"\"\n", | |
| "\n", | |
| " # use numpy to initialze an empty array\n", | |
| " p = pd.Series(np.zeros(len(label)))\n", | |
| "\n", | |
| " # use pandas to select data matching label \"0\"\n", | |
| " idx0 = label == 0\n", | |
| "\n", | |
| " # sample from numpy and assign to the selected data\n", | |
| " p[idx0] = np.random.poisson(7, len(idx0))\n", | |
| "\n", | |
| " # return the pandas series\n", | |
| " return p\n", | |
| "\n", | |
| "sample_udf = pandas_udf(sample, DoubleType())\n", | |
| "\n", | |
| "result = df.withColumn(\"counts\", sample_udf(col(\"label\")))\n", | |
| "result.show(n=7)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 2", | |
| "language": "python", | |
| "name": "python2" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 2 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython2", | |
| "version": "2.7.13" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 2 | |
| } |
Hi Bryan,
I have been able to make pandas_udf's work using an ipython notebook, but this does not work with spark-submit
from pyspark.sql.functions import PandasUDFType
@pandas_udf("id int",PandasUDFType.GROUPED_MAP)
def featurize_udf(dfp):
return pd.DataFrame({'id':[1,2]})
gdf = df.groupBy('username').apply(featurize_udf)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:333)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:322)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:177)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:158)
... 24 more
is this ready for production?
Looks like the Jira ticket is closed and this is part of Spark 2.3, thanks!