Created
February 18, 2016 23:44
-
-
Save asimjalis/4f911882a1ab963859ce to your computer and use it in GitHub Desktop.
Spark Scala example of creating and using Neural Networks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Sanity Check Spark Context" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"org.apache.spark.SparkContext@10bf2185" | |
] | |
}, | |
"execution_count": 1, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"sc" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": { | |
"collapsed": false, | |
"scrolled": true | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"org.apache.spark.sql.hive.HiveContext@6e4d660" | |
] | |
}, | |
"execution_count": 2, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"sqlContext" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30)" | |
] | |
}, | |
"execution_count": 3, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"sc.parallelize(1 to 30).collect" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Neural Network On Iris Dataset\n", | |
"\n", | |
"Lets build a neural network on the Iris dataset that ships with Spark." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"import org.apache.spark.ml.classification.MultilayerPerceptronClassifier\n", | |
"import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"// Load the data stored in LIBSVM format as a DataFrame.\n", | |
"val path = \"/Users/asimjalis/g/spark/data/mllib/sample_multiclass_classification_data.txt\"\n", | |
"val data = sqlContext.read.format(\"libsvm\").load(path)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"List(150, 96, 54)" | |
] | |
}, | |
"execution_count": 6, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"// Split the data into train and test.\n", | |
"val splits = data.randomSplit(Array(0.6, 0.4), seed = 1234L)\n", | |
"val train = splits(0)\n", | |
"val test = splits(1)\n", | |
"List(data,train,test).map(_.count)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"// Specify layers for the NN. Input -> Hidden -> Hidden -> Output.\n", | |
"val layers = Array[Int](4, 5, 4, 3)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"// Create trainer.\n", | |
"val trainer = new MultilayerPerceptronClassifier().\n", | |
" setLayers(layers).\n", | |
" setBlockSize(128).\n", | |
" setSeed(1234L).\n", | |
" setMaxIter(100)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"mlpc_bd731092d222" | |
] | |
}, | |
"execution_count": 9, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"// Train model.\n", | |
"val model = trainer.fit(train)\n", | |
"model" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"precision = 0.9444444444444444\n", | |
"recall = 0.9444444444444444\n", | |
"f1 = 0.9445779112445778\n" | |
] | |
} | |
], | |
"source": [ | |
"// Compute precision, recall, f1.\n", | |
"val result = model.transform(test)\n", | |
"val predictionAndLabels = result.select(\"prediction\", \"label\")\n", | |
"val evaluator = new MulticlassClassificationEvaluator()\n", | |
"Array(\"precision\",\"recall\",\"f1\").foreach { metric =>\n", | |
" evaluator.setMetricName(metric)\n", | |
" println(metric + \" = \" + evaluator.evaluate(predictionAndLabels)) \n", | |
"}" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Precision, Recall, F1" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"What is precision?" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Number of correct positive results divided number of all positive results.\n", | |
"\n", | |
"$\n", | |
"\\begin{align}\n", | |
"precision \n", | |
" & = \\dfrac{true\\ positives}{true\\ positives + false\\ positives} \\\\\n", | |
" & = \\dfrac{TP}{TP + FP} \\\\\n", | |
"\\end{align}\n", | |
"$" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"What is recall?" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Number of correct positive results divided by number of positive results that should have been returned.\n", | |
"\n", | |
"$\n", | |
"\\begin{align}\n", | |
"recall & = \\dfrac{true\\ positives}{true\\ positives + false\\ negatives} \\\\\n", | |
" & = \\dfrac{TP}{TP + FN} \\\\\n", | |
" & = \\dfrac{TP}{P}\n", | |
"\\end{align}\n", | |
"$" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"What is F1?" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Weighted average of precision and recall.\n", | |
"\n", | |
"$ f1 = \\dfrac{precision.recall}{precision+recall} $" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Fun with NNs" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"// Imports\n", | |
"import org.apache.spark.ml.classification.MultilayerPerceptronClassifier\n", | |
"import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator\n", | |
"import org.apache.spark.mllib.linalg.Vectors\n", | |
"import org.apache.spark.mllib.linalg.Vector\n", | |
"import org.apache.spark.rdd.RDD\n", | |
"import org.apache.spark.mllib.regression.LabeledPoint\n", | |
"import org.apache.spark.mllib.linalg.DenseVector\n", | |
"import org.apache.spark.mllib.util.MLUtils\n", | |
"import sqlContext.implicits._" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"// Bit counting routines\n", | |
"\n", | |
"def log2(x:Double) = {\n", | |
" math.log(x)/math.log(2)\n", | |
"}\n", | |
"\n", | |
"val INT_BIT_LEN = (log2(Int.MaxValue) + 1).toInt + 1\n", | |
"\n", | |
"def bitAt(x:Int,pos:Int) = {\n", | |
" (x >> pos) & 0x1\n", | |
"}\n", | |
"\n", | |
"def bitLen(x:Int) = {\n", | |
" x match {\n", | |
" case 0 => 0\n", | |
" case n if n < 0 => INT_BIT_LEN\n", | |
" case _ => (log2(x) + 1).toInt\n", | |
" }\n", | |
"}\n", | |
"\n", | |
"def bitsToArray(x:Int) = {\n", | |
" (0 to bitLen(x) - 1).map(i => bitAt(x,i))\n", | |
"}\n", | |
"\n", | |
"def countBits(x:Int) = {\n", | |
" bitsToArray(x).sum\n", | |
"}" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 13, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"(0,Vector())\n", | |
"(1,Vector(1))\n", | |
"(2,Vector(0, 1))\n", | |
"(3,Vector(1, 1))\n", | |
"(4,Vector(0, 0, 1))\n", | |
"(5,Vector(1, 0, 1))\n", | |
"(6,Vector(0, 1, 1))\n", | |
"(7,Vector(1, 1, 1))\n", | |
"(8,Vector(0, 0, 0, 1))\n", | |
"(9,Vector(1, 0, 0, 1))\n" | |
] | |
} | |
], | |
"source": [ | |
"// Example using bit routines.\n", | |
"Stream.from(0).map(i => (i,bitsToArray(i))).take(10).\n", | |
" toArray.foreach(println)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"10" | |
] | |
}, | |
"execution_count": 14, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"bitLen(1023)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [], | |
"source": [ | |
"// Create plain RDD.\n", | |
"val rdd = sc.parallelize(Range(0,1023))\n", | |
"\n", | |
"// Convert to LabeledPoint RDD.\n", | |
"val data = rdd.\n", | |
" map(x => {\n", | |
" // Output will be number of bits.\n", | |
" val label:Double = if (countBits(x) > 5) 1.0 else 0.0\n", | |
"\n", | |
" // Pad array with zeros so all inputs same length.\n", | |
" val features = bitsToArray(x).padTo(10,0).map(_.toDouble).toArray\n", | |
"\n", | |
" // Create labeled point.\n", | |
" new LabeledPoint(label, new DenseVector(features))\n", | |
" }).toDF(\"label\",\"features\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"// Split data into training (70%) and test (30%).\n", | |
"val splits = data.randomSplit(Array(0.7, 0.3))\n", | |
"val (train, test) = (splits(0), splits(1))\n", | |
"\n", | |
"// Specify layers for NN. Input -> Hidden -> Output\n", | |
"val layers = Array[Int](10, 15, 2)\n", | |
"\n", | |
"// Create trainer. \n", | |
"val trainer = new MultilayerPerceptronClassifier().\n", | |
" setLayers(layers).\n", | |
" setBlockSize(128).\n", | |
" setSeed(1234L).\n", | |
" setMaxIter(100)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"mlpc_dbc638e4b209" | |
] | |
}, | |
"execution_count": 17, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"// Train model.\n", | |
"val model = trainer.fit(train)\n", | |
"model" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 18, | |
"metadata": { | |
"collapsed": true | |
}, | |
"outputs": [], | |
"source": [ | |
"// Compute precision, recall, f1.\n", | |
"val result = model.transform(test)\n", | |
"val predictionAndLabels = result.select(\"prediction\", \"label\")\n", | |
"val evaluator = new MulticlassClassificationEvaluator()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 19, | |
"metadata": { | |
"collapsed": false | |
}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"precision = 1.0\n", | |
"recall = 1.0\n", | |
"f1 = 1.0\n" | |
] | |
} | |
], | |
"source": [ | |
"Array(\"precision\",\"recall\",\"f1\").foreach { metric =>\n", | |
" evaluator.setMetricName(metric)\n", | |
" println(metric + \" = \" + evaluator.evaluate(predictionAndLabels))\n", | |
"}" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Toree", | |
"language": "", | |
"name": "toree" | |
}, | |
"language_info": { | |
"name": "scala" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 0 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment