Skip to content

Instantly share code, notes, and snippets.

@invkrh
Last active December 9, 2015 14:28
Show Gist options
  • Save invkrh/da7562c5d5b1cc66d9f9 to your computer and use it in GitHub Desktop.
Save invkrh/da7562c5d5b1cc66d9f9 to your computer and use it in GitHub Desktop.
{
"cells": [{
"cell_type": "markdown",
"source": ["# ETL and K-Means\n \nThis lab will demonstrate loading data from a file, transforming that data into a form usable with the ML and MLlib libraries, and building a k-means clustering using both ML and MLlib.\n \nUpon completing this lab you should understand how to read from and write to files in Spark, convert between `RDDs` and `DataFrames`, and build a model using both the ML and MLlib APIs."],
"metadata": {}
}, {
"cell_type": "markdown",
"source": ["#### Loading the data\n \nFirst, we need to load data into Spark. We'll use a built-in utility to load a [libSVM file](www.csie.ntu.edu.tw/~cjlin/libsvm/faq.html), which is stored in an S3 bucket on AWS. We'll use `MLUtils.loadLibSVMFile` to load our file. Here are the [Python](http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.util.MLUtils.loadLibSVMFile) and [Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.util.MLUtils$) APIs."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["from pyspark.mllib.util import MLUtils\n\nbaseDir = '/mnt/ml-amsterdam/'\nirisPath = baseDir + 'iris.scale'\nirisRDD = MLUtils.loadLibSVMFile(sc, irisPath, minPartitions=20).cache()\n\n# We get back an RDD of LabeledPoints. Note that the libSVM format uses SparseVectors.\nirisRDD.take(5)"],
"metadata": {},
"outputs": [],
"execution_count": 3
}, {
"cell_type": "markdown",
"source": ["What if we wanted to see the first few lines of the libSVM file to see what the format looks like?"],
"metadata": {}
}, {
"cell_type": "code",
"source": ["sc.textFile(irisPath).take(5)"],
"metadata": {},
"outputs": [],
"execution_count": 5
}, {
"cell_type": "markdown",
"source": ["How is this data stored across partitions?"],
"metadata": {}
}, {
"cell_type": "code",
"source": ["print 'number of partitions: {0}'.format(irisRDD.getNumPartitions())\nelementsPerPart = (irisRDD\n .mapPartitionsWithIndex(lambda i,x: [(i, len(list(x)))])\n .collect())\nprint 'elements per partition: {0}\\n'.format(elementsPerPart)\nirisRDD.glom().take(1)"],
"metadata": {},
"outputs": [],
"execution_count": 7
}, {
"cell_type": "markdown",
"source": ["Let's convert this `RDD` of `LabeledPoints` to a `DataFrame`"],
"metadata": {}
}, {
"cell_type": "code",
"source": ["irisDF = irisRDD.toDF()\nirisDF.take(5)"],
"metadata": {},
"outputs": [],
"execution_count": 9
}, {
"cell_type": "code",
"source": ["irisDF.take(5)"],
"metadata": {},
"outputs": [],
"execution_count": 10
}, {
"cell_type": "code",
"source": ["irisDF.show(n=20, truncate=False)"],
"metadata": {},
"outputs": [],
"execution_count": 11
}, {
"cell_type": "code",
"source": ["display(irisDF)"],
"metadata": {},
"outputs": [],
"execution_count": 12
}, {
"cell_type": "code",
"source": ["print irisDF.schema, '\\n'\nirisDF.printSchema()"],
"metadata": {},
"outputs": [],
"execution_count": 13
}, {
"cell_type": "markdown",
"source": ["Why were we able to convert directly from a `LabeledPoint` to a `Row`?"],
"metadata": {}
}, {
"cell_type": "code",
"source": ["class Person(object):\n def __init__(self, name, age):\n self.name = name\n self.age = age\n\npersonDF = sqlContext.createDataFrame([Person('Bob', 28), Person('Julie', 35)])\ndisplay(personDF)"],
"metadata": {},
"outputs": [],
"execution_count": 15
}, {
"cell_type": "markdown",
"source": ["RDD containing Python object to a DataFrame\n \n[createDataFrame](https://github.com/apache/spark/blob/3a11e50e21ececbec9708eb487b08196f195cd87/python/pyspark/sql/context.py#L342)\n \n[_createFromRDD](https://github.com/apache/spark/blob/3a11e50e21ececbec9708eb487b08196f195cd87/python/pyspark/sql/context.py#L280)\n \n[_inferSchema](https://github.com/apache/spark/blob/3a11e50e21ececbec9708eb487b08196f195cd87/python/pyspark/sql/context.py#L221)\n \n[_infer_schema](https://github.com/apache/spark/blob/3a11e50e21ececbec9708eb487b08196f195cd87/python/pyspark/sql/types.py#L813)\n \n[back to _createFromRDD](https://github.com/apache/spark/blob/3a11e50e21ececbec9708eb487b08196f195cd87/python/pyspark/sql/context.py#L304)\n \n[toInternal](https://github.com/apache/spark/blob/3a11e50e21ececbec9708eb487b08196f195cd87/python/pyspark/sql/types.py#L533)\n \n[back to createDataFrame](https://github.com/apache/spark/blob/3a11e50e21ececbec9708eb487b08196f195cd87/python/pyspark/sql/context.py#L404)"],
"metadata": {}
}, {
"cell_type": "code",
"source": ["# Our object does have a __dict__ attribute\nprint Person('Bob', 28).__dict__"],
"metadata": {},
"outputs": [],
"execution_count": 17
}, {
"cell_type": "code",
"source": ["personDF = sqlContext.createDataFrame([Person('Bob', 28), Person('Julie', 35)])\ndisplay(personDF)"],
"metadata": {},
"outputs": [],
"execution_count": 18
}, {
"cell_type": "code",
"source": ["# Show the schema that was inferred\nprint personDF.schema\npersonDF.printSchema()"],
"metadata": {},
"outputs": [],
"execution_count": 19
}, {
"cell_type": "code",
"source": ["from collections import namedtuple\nPersonTuple = namedtuple('Person', ['name', 'age'])\npersonTupleDF = sqlContext.createDataFrame([PersonTuple('Bob', 28), PersonTuple('Julie', 35)])\ndisplay(personTupleDF)\n"],
"metadata": {},
"outputs": [],
"execution_count": 20
}, {
"cell_type": "code",
"source": ["personTupleDF.printSchema()"],
"metadata": {},
"outputs": [],
"execution_count": 21
}, {
"cell_type": "markdown",
"source": ["#### Transform the data\n \nIf you look at the data you'll notice that there are three values for label: 1, 2, and 3. Spark's machine learning algorithms expect a 0 indexed target variable, so we'll want to adjust those labels. This transformation is a simple expression where we'll subtract one from our `label` column.\n \nFor help reference the SQL Programming Guide portion on [dataframe-operations](http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-operations) or the Spark SQL [Python](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html) and [Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package) APIs. `select`, `col`, and `alias` can be used to accomplish this.\n \nThe resulting `DataFrame` should have two columns: one named `features` and another named `label`."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["# ANSWER\nfrom pyspark.sql.functions import col\n\nirisDFZeroIndex = irisDF.select('features', (col('label') - 1).alias('label'))\ndisplay(irisDFZeroIndex)"],
"metadata": {},
"outputs": [],
"execution_count": 23
}, {
"cell_type": "code",
"source": ["# TEST\nfrom test_helper import Test\nTest.assertEquals(irisDFZeroIndex.select('label').map(lambda r: r[0]).take(3), [0, 0, 0],\n 'incorrect value for irisDFZeroIndex')"],
"metadata": {},
"outputs": [],
"execution_count": 24
}, {
"cell_type": "markdown",
"source": ["You'll also notice that we have four values for features and that those values are stored as a `SparseVector`. We'll reduce those down to two values (for visualization purposes) and convert them to a `DenseVector`. To do that we'll need to create a `udf` and apply it to our dataset. Here's a `udf` reference for [Python](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.udf) and for [Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.UserDefinedFunction).\n \nNote that you can call the `toArray` method on a `SparseVector` to obtain an array, and you can convert an array into a `DenseVector` using the `Vectors.dense` method."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["# ANSWER\nfrom pyspark.sql.functions import udf\n# Note that VectorUDT and MatrixUDT are found in linalg while other types are in sql.types\n# VectorUDT should be the return type of the udf\nfrom pyspark.mllib.linalg import Vectors, VectorUDT\n\n# Take the first two values from a SparseVector and convert them to a DenseVector\nfirstTwoFeatures = udf(lambda sv: Vectors.dense(sv.toArray()[:2]), VectorUDT())\n\nirisTwoFeatures = irisDFZeroIndex.select(firstTwoFeatures('features').alias('features'), 'label').cache()\ndisplay(irisTwoFeatures)"],
"metadata": {},
"outputs": [],
"execution_count": 26
}, {
"cell_type": "code",
"source": ["# TEST\nTest.assertEquals(str(irisTwoFeatures.first()), 'Row(features=DenseVector([-0.5556, 0.25]), label=0.0)',\n 'incorrect definition of firstTwoFeatures')"],
"metadata": {},
"outputs": [],
"execution_count": 27
}, {
"cell_type": "markdown",
"source": ["## Part 2"],
"metadata": {}
}, {
"cell_type": "code",
"source": ["repr(irisTwoFeatures.first()[0])"],
"metadata": {},
"outputs": [],
"execution_count": 29
}, {
"cell_type": "markdown",
"source": ["Let's view our `irisTwoFeatures` `DataFrame`."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["irisTwoFeatures.take(5)"],
"metadata": {},
"outputs": [],
"execution_count": 31
}, {
"cell_type": "code",
"source": ["display(irisTwoFeatures)"],
"metadata": {},
"outputs": [],
"execution_count": 32
}, {
"cell_type": "markdown",
"source": ["#### Saving our DataFrame"],
"metadata": {}
}, {
"cell_type": "markdown",
"source": ["We'll be using parquet files to save our data. More information about the parquet file format can be found on [parquet.apache.org](https://parquet.apache.org/documentation/latest/)."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["help(irisTwoFeatures.write.parquet)"],
"metadata": {},
"outputs": [],
"execution_count": 35
}, {
"cell_type": "code",
"source": ["import uuid\nif 'parqUUID' not in locals():\n parqUUID = uuid.uuid1()\nirisTwoFeatures.write.mode('overwrite').parquet('/tmp/{0}/irisTwoFeatures.parquet'.format(parqUUID))"],
"metadata": {},
"outputs": [],
"execution_count": 36
}, {
"cell_type": "markdown",
"source": ["Note that we'll get a part file for each partition and that these files are compressed."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["#display(dbutils.fs.ls(baseDir + 'irisTwoFeatures.parquet'))\ndisplay(dbutils.fs.ls('/tmp/{0}/irisTwoFeatures.parquet'.format(parqUUID)))"],
"metadata": {},
"outputs": [],
"execution_count": 38
}, {
"cell_type": "code",
"source": ["irisDFZeroIndex.write.mode('overwrite').parquet('/tmp/{0}/irisFourFeatures.parquet'.format(parqUUID))"],
"metadata": {},
"outputs": [],
"execution_count": 39
}, {
"cell_type": "markdown",
"source": ["#### K-Means"],
"metadata": {}
}, {
"cell_type": "markdown",
"source": ["Now we'll build a k-means model using our two features and inspect the class hierarchy.\n \nWe'll build the k-means model using `KMeans`, an `ml` `Estimator`. Details can be found in the [Python](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.clustering) and [Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.clustering.package) APIs. Also, examples that use [PCA](http://spark.apache.org/docs/latest/ml-features.html#pca) and [logistic regression](http://spark.apache.org/docs/latest/ml-guide.html#example-estimator-transformer-and-param) can be found in the ML Programming Guide.\n \nMake sure to work with the `irisTwoFeatures` `DataFrame`."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["# ANSWER\nfrom pyspark.ml.clustering import KMeans\n\n# Create a KMeans Estimator and set k=3, seed=5, maxIter=20, initSteps=1\nkmeans = (KMeans()\n .setK(3)\n .setSeed(5)\n .setMaxIter(20)\n .setInitSteps(1))\n\n# Call fit on the estimator and pass in our DataFrame\nmodel = kmeans.fit(irisTwoFeatures)\n\n# Obtain the clusterCenters from the KMeansModel\ncenters = model.clusterCenters()\n\n# Use the model to transform the DataFrame by adding cluster predictions\ntransformed = model.transform(irisTwoFeatures)\n\nprint centers"],
"metadata": {},
"outputs": [],
"execution_count": 42
}, {
"cell_type": "code",
"source": ["# TEST\nimport numpy as np\nTest.assertTrue(np.allclose([ 0.35115296, -0.10691828], centers[0]),\n 'incorrect centers. check your params.')\nTest.assertEquals(transformed.select('prediction').map(lambda r: r[0]).take(4), [1,1,1,1],\n 'incorrect predictions')"],
"metadata": {},
"outputs": [],
"execution_count": 43
}, {
"cell_type": "markdown",
"source": ["## PART 3"],
"metadata": {}
}, {
"cell_type": "markdown",
"source": ["From the class hierarchy it is clear that `KMeans` is an `Estimator` while `KMeansModel` is a `Transformer`."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["print '*** KMeans instance inheritance partial tree ***'\nprint '\\n{0}\\n'.format(type(kmeans))\nprint '\\n'.join(map(str, type(kmeans).__bases__))\nprint '\\n{0}'.format(type(kmeans).__bases__[0].__bases__)\n\nprint '\\n\\n*** KMeansModel instance inheritance partial tree ***'\nprint '\\n{0}\\n'.format(type(model))\nprint '\\n'.join(map(str, type(model).__bases__)) + '\\n'\nprint '\\n'.join(map(str, type(model).__bases__[0].__bases__))\nprint '\\n{0}'.format(type(model).__bases__[0].__bases__[0].__bases__)"],
"metadata": {},
"outputs": [],
"execution_count": 46
}, {
"cell_type": "markdown",
"source": ["Let's print the three centroids of our model"],
"metadata": {}
}, {
"cell_type": "code",
"source": ["print centers"],
"metadata": {},
"outputs": [],
"execution_count": 48
}, {
"cell_type": "markdown",
"source": ["Note that our predicted cluster is appended, as a column, to our input `DataFrame`. Here it would be desirable to see consistency between label and prediction. These don't need to be the same number but if label 0 is usually predicted to be cluster 1 that would indicate that our unsupervised learning is naturally grouping the data into species."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["display(transformed)"],
"metadata": {},
"outputs": [],
"execution_count": 50
}, {
"cell_type": "markdown",
"source": ["#### K-Means Visualized"],
"metadata": {}
}, {
"cell_type": "code",
"source": ["modelCenters = []\niterations = [0, 2, 4, 7, 10, 20]\nfor i in iterations:\n kmeans = KMeans(k=3, seed=5, maxIter=i, initSteps=1)\n model = kmeans.fit(irisTwoFeatures)\n modelCenters.append(model.clusterCenters())"],
"metadata": {},
"outputs": [],
"execution_count": 52
}, {
"cell_type": "code",
"source": ["print 'modelCenters:'\nfor centroids in modelCenters:\n print centroids"],
"metadata": {},
"outputs": [],
"execution_count": 53
}, {
"cell_type": "code",
"source": ["import matplotlib.pyplot as plt\nimport matplotlib.cm as cm\nimport numpy as np\n\ndef prepareSubplot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',\n gridWidth=1.0, subplots=(1, 1)):\n \"\"\"Template for generating the plot layout.\"\"\"\n plt.close()\n fig, axList = plt.subplots(subplots[0], subplots[1], figsize=figsize, facecolor='white',\n edgecolor='white')\n if not isinstance(axList, np.ndarray):\n axList = np.array([axList])\n\n for ax in axList.flatten():\n ax.axes.tick_params(labelcolor='#999999', labelsize='10')\n for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:\n axis.set_ticks_position('none')\n axis.set_ticks(ticks)\n axis.label.set_color('#999999')\n if hideLabels: axis.set_ticklabels([])\n ax.grid(color=gridColor, linewidth=gridWidth, linestyle='-')\n map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])\n\n if axList.size == 1:\n axList = axList[0] # Just return a single axes object for a regular plot\n return fig, axList"],
"metadata": {},
"outputs": [],
"execution_count": 54
}, {
"cell_type": "markdown",
"source": ["Display the clustroid centers with the original labeled points and circles representing distance from the centroids."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["data = irisTwoFeatures.collect()\nfeatures, labels = zip(*data)\nx, y = zip(*features)\n\ncentroidX, centroidY = zip(*centers)\ncolorMap = 'Set1' # was 'Set2', 'Set1', 'Dark2', 'winter'\n\nfig, ax = prepareSubplot(np.arange(-1, 1.1, .4), np.arange(-1, 1.1, .4), figsize=(8,6))\nplt.scatter(x, y, s=14**2, c=labels, edgecolors='#8cbfd0', alpha=0.80, cmap=colorMap)\nplt.scatter(centroidX, centroidY, s=22**2, marker='*', c='yellow')\ncmap = cm.get_cmap(colorMap)\n\ncolorIndex = [.99, 0., .5]\nfor i, (x,y) in enumerate(centers):\n print cmap(colorIndex[i])\n for size in [.10, .20, .30, .40, .50]:\n circle1=plt.Circle((x,y),size,color=cmap(colorIndex[i]), alpha=.10, linewidth=2)\n ax.add_artist(circle1)\n\nax.set_xlabel('Sepal Length'), ax.set_ylabel('Sepal Width')\ndisplay(fig)"],
"metadata": {},
"outputs": [],
"execution_count": 56
}, {
"cell_type": "markdown",
"source": ["Visualize how the clustroid centers move as the k-means algorithm iterates."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["x, y = zip(*features)\n\noldCentroidX, oldCentroidY = None, None\n\nfig, axList = prepareSubplot(np.arange(-1, 1.1, .4), np.arange(-1, 1.1, .4), figsize=(11, 15),\n subplots=(3, 2))\naxList = axList.flatten()\n\nfor i,ax in enumerate(axList[:]):\n ax.set_title('K-means for {0} iterations'.format(iterations[i]), color='#999999')\n centroids = modelCenters[i]\n centroidX, centroidY = zip(*centroids)\n\n ax.scatter(x, y, s=10**2, c=labels, edgecolors='#8cbfd0', alpha=0.80, cmap=colorMap)\n ax.scatter(centroidX, centroidY, s=16**2, marker='*', c='yellow')\n if oldCentroidX and oldCentroidY:\n ax.scatter(oldCentroidX, oldCentroidY, s=16**2, marker='*', c='grey')\n cmap = cm.get_cmap(colorMap)\n\n colorIndex = [.99, 0., .5]\n for i, (x1,y1) in enumerate(centroids):\n print cmap(colorIndex[i])\n circle1=plt.Circle((x1,y1),.35,color=cmap(colorIndex[i]), alpha=.40)\n ax.add_artist(circle1)\n\n ax.set_xlabel('Sepal Length'), ax.set_ylabel('Sepal Width')\n oldCentroidX, oldCentroidY = centroidX, centroidY\n#axList[-1].cla()\n#axList[-1].get_yaxis().set_ticklabels([])\n#axList[-1].get_xaxis().set_ticklabels([])\n\nplt.tight_layout()\n\ndisplay(fig)"],
"metadata": {},
"outputs": [],
"execution_count": 58
}, {
"cell_type": "markdown",
"source": ["Summary plot of the centroid movement."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["centroidX, centroidY = zip(*centers)\n\n# generate layout and plot data\ndef plotKMeansTrack(x=x, y=y, labels=labels):\n fig, ax = prepareSubplot(np.arange(-1, 1.1, .4), np.arange(-1, 1.1, .4), figsize=(8, 6))\n ax.set_ylim(-1, 1), ax.set_xlim(-1, 1)\n #plt.scatter(x, y, s=14**2, c=labels, edgecolors='#8cbfd0', alpha=0.80, cmap='winter')\n cmap = cm.get_cmap(colorMap)\n\n colorIndex = [.99, 0.0, .5]\n\n alphas = [.05, .10, .15, .20, .30, .40]\n sizes = [8, 12, 16, 20, 24, 28]\n\n for iteration, centroids in enumerate(modelCenters):\n centroidX, centroidY = zip(*centroids)\n color = 'lightgrey' if iteration < 5 else 'yellow'\n plt.scatter(centroidX, centroidY, s=sizes[iteration]**2, marker='*', c=color)\n\n for i, (x,y) in enumerate(centroids):\n print cmap(colorIndex[i])\n circle1=plt.Circle((x,y),.35,color=cmap(colorIndex[i%3]), alpha=alphas[iteration], linewidth=2.0)\n ax.add_artist(circle1)\n\n\n ax.set_xlabel('Sepal Length'), ax.set_ylabel('Sepal Width')\n display(fig)\n\nplotKMeansTrack(centroidX, centroidY)"],
"metadata": {},
"outputs": [],
"execution_count": 60
}, {
"cell_type": "markdown",
"source": ["#### Using MLlib instead of ML"],
"metadata": {}
}, {
"cell_type": "markdown",
"source": ["First, convert our `DataFrame` into an `RDD`."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["# Note that .rdd is not necessary, but is here to illustrate that we are working with an RDD\nirisTwoFeaturesRDD = (irisTwoFeatures\n .rdd\n .map(lambda r: (r[1], r[0])))\nirisTwoFeaturesRDD.take(2)"],
"metadata": {},
"outputs": [],
"execution_count": 63
}, {
"cell_type": "markdown",
"source": ["Then import MLlib's `KMeans` as `MLlibKMeans` to differentiate it from `ml.KMeans`"],
"metadata": {}
}, {
"cell_type": "code",
"source": ["from pyspark.mllib.clustering import KMeans as MLlibKMeans\n\nhelp(MLlibKMeans)"],
"metadata": {},
"outputs": [],
"execution_count": 65
}, {
"cell_type": "markdown",
"source": ["Finally, let's build our k-means model. Here are the relevant [Python](http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeans) and [Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.clustering.KMeans) APIs.\n \nMake sure to set `k` to 3, `maxIterations` to 20, `seed` to 5, and `initializationSteps` to 1. Also, note that were returned an `RDD` with (label, feature) tuples. You'll just need the features, which you can obtain by calling `.values()` on `irisTwoFeaturesRDD`."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["# ANSWER\nmllibKMeans = MLlibKMeans.train(irisTwoFeaturesRDD.values(), 3, maxIterations=20, seed=5,\n initializationSteps=1)\n\nprint 'mllib: {0}'.format(mllibKMeans.clusterCenters)\nprint 'ml: {0}'.format(centers)"],
"metadata": {},
"outputs": [],
"execution_count": 67
}, {
"cell_type": "code",
"source": ["# TEST\nimport numpy as np\nTest.assertTrue(np.allclose(mllibKMeans.clusterCenters, centers), \"Your mllib and ml models don't match\")"],
"metadata": {},
"outputs": [],
"execution_count": 68
}, {
"cell_type": "markdown",
"source": ["Now that we have an `mllibKMeans` model how do we generate predictions and compare those to our labels?"],
"metadata": {}
}, {
"cell_type": "code",
"source": ["predictionsRDD = mllibKMeans.predict(irisTwoFeaturesRDD.values())\nprint predictionsRDD.take(5)"],
"metadata": {},
"outputs": [],
"execution_count": 70
}, {
"cell_type": "markdown",
"source": ["We'll use `zip` to combine the feature and prediction RDDs together. Note that zip assumes that the RDDs have the same number of partitions and that each partition has the same number of elements. This is true here as our predictions were the result of a `map` operation on the feature RDD."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["combinedRDD = irisTwoFeaturesRDD.zip(predictionsRDD)\ncombinedRDD.take(5)"],
"metadata": {},
"outputs": [],
"execution_count": 72
}, {
"cell_type": "markdown",
"source": ["Let's compare this to the result from `ml`."],
"metadata": {}
}, {
"cell_type": "code",
"source": ["display(transformed)"],
"metadata": {},
"outputs": [],
"execution_count": 74
}, {
"cell_type": "markdown",
"source": ["#### How do the `ml` and `mllib` implementations differ?"],
"metadata": {}
}, {
"cell_type": "code",
"source": ["import inspect\nprint inspect.getsource(kmeans.fit)"],
"metadata": {},
"outputs": [],
"execution_count": 76
}, {
"cell_type": "code",
"source": ["print inspect.getsource(kmeans._fit)"],
"metadata": {},
"outputs": [],
"execution_count": 77
}, {
"cell_type": "code",
"source": ["print inspect.getsource(kmeans._fit_java)"],
"metadata": {},
"outputs": [],
"execution_count": 78
}, {
"cell_type": "markdown",
"source": ["The `ml` version of k-means is just a wrapper to MLlib's implementation. Let's take a look here:\n[org.apache.spark.ml.clustering.KMeans source](https://github.com/apache/spark/blob/e1e77b22b3b577909a12c3aa898eb53be02267fd/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala#L192).\n \nHow is $ being used in this function? `Param` [source code](https://github.com/apache/spark/blob/2b574f52d7bf51b1fe2a73086a3735b633e9083f/mllib/src/main/scala/org/apache/spark/ml/param/params.scala#L643) has the answer.\n \nWhich is different than $'s usage for SQL columns where it is a [string interpolator that returns a ColumnName](https://github.com/apache/spark/blob/3d683a139b333456a6bd8801ac5f113d1ac3fd18/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L386)"],
"metadata": {}
}],
"metadata": {
"name": "2-etl-kmeans_answers",
"notebookId": 34341
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment