Skip to content

Instantly share code, notes, and snippets.

@asimjalis
Created February 18, 2016 23:44
Show Gist options
  • Save asimjalis/4f911882a1ab963859ce to your computer and use it in GitHub Desktop.
Save asimjalis/4f911882a1ab963859ce to your computer and use it in GitHub Desktop.
Spark Scala example of creating and using Neural Networks
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Sanity Check Spark Context"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"org.apache.spark.SparkContext@10bf2185"
]
},
"execution_count": 1,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sc"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"org.apache.spark.sql.hive.HiveContext@6e4d660"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sqlContext"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30)"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sc.parallelize(1 to 30).collect"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Neural Network On Iris Dataset\n",
"\n",
"Lets build a neural network on the Iris dataset that ships with Spark."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import org.apache.spark.ml.classification.MultilayerPerceptronClassifier\n",
"import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Load the data stored in LIBSVM format as a DataFrame.\n",
"val path = \"/Users/asimjalis/g/spark/data/mllib/sample_multiclass_classification_data.txt\"\n",
"val data = sqlContext.read.format(\"libsvm\").load(path)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"List(150, 96, 54)"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"// Split the data into train and test.\n",
"val splits = data.randomSplit(Array(0.6, 0.4), seed = 1234L)\n",
"val train = splits(0)\n",
"val test = splits(1)\n",
"List(data,train,test).map(_.count)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Specify layers for the NN. Input -> Hidden -> Hidden -> Output.\n",
"val layers = Array[Int](4, 5, 4, 3)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Create trainer.\n",
"val trainer = new MultilayerPerceptronClassifier().\n",
" setLayers(layers).\n",
" setBlockSize(128).\n",
" setSeed(1234L).\n",
" setMaxIter(100)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"mlpc_bd731092d222"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"// Train model.\n",
"val model = trainer.fit(train)\n",
"model"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"precision = 0.9444444444444444\n",
"recall = 0.9444444444444444\n",
"f1 = 0.9445779112445778\n"
]
}
],
"source": [
"// Compute precision, recall, f1.\n",
"val result = model.transform(test)\n",
"val predictionAndLabels = result.select(\"prediction\", \"label\")\n",
"val evaluator = new MulticlassClassificationEvaluator()\n",
"Array(\"precision\",\"recall\",\"f1\").foreach { metric =>\n",
" evaluator.setMetricName(metric)\n",
" println(metric + \" = \" + evaluator.evaluate(predictionAndLabels)) \n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Precision, Recall, F1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"What is precision?"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Number of correct positive results divided number of all positive results.\n",
"\n",
"$\n",
"\\begin{align}\n",
"precision \n",
" & = \\dfrac{true\\ positives}{true\\ positives + false\\ positives} \\\\\n",
" & = \\dfrac{TP}{TP + FP} \\\\\n",
"\\end{align}\n",
"$"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"What is recall?"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Number of correct positive results divided by number of positive results that should have been returned.\n",
"\n",
"$\n",
"\\begin{align}\n",
"recall & = \\dfrac{true\\ positives}{true\\ positives + false\\ negatives} \\\\\n",
" & = \\dfrac{TP}{TP + FN} \\\\\n",
" & = \\dfrac{TP}{P}\n",
"\\end{align}\n",
"$"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"What is F1?"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Weighted average of precision and recall.\n",
"\n",
"$ f1 = \\dfrac{precision.recall}{precision+recall} $"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Fun with NNs"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"// Imports\n",
"import org.apache.spark.ml.classification.MultilayerPerceptronClassifier\n",
"import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator\n",
"import org.apache.spark.mllib.linalg.Vectors\n",
"import org.apache.spark.mllib.linalg.Vector\n",
"import org.apache.spark.rdd.RDD\n",
"import org.apache.spark.mllib.regression.LabeledPoint\n",
"import org.apache.spark.mllib.linalg.DenseVector\n",
"import org.apache.spark.mllib.util.MLUtils\n",
"import sqlContext.implicits._"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Bit counting routines\n",
"\n",
"def log2(x:Double) = {\n",
" math.log(x)/math.log(2)\n",
"}\n",
"\n",
"val INT_BIT_LEN = (log2(Int.MaxValue) + 1).toInt + 1\n",
"\n",
"def bitAt(x:Int,pos:Int) = {\n",
" (x >> pos) & 0x1\n",
"}\n",
"\n",
"def bitLen(x:Int) = {\n",
" x match {\n",
" case 0 => 0\n",
" case n if n < 0 => INT_BIT_LEN\n",
" case _ => (log2(x) + 1).toInt\n",
" }\n",
"}\n",
"\n",
"def bitsToArray(x:Int) = {\n",
" (0 to bitLen(x) - 1).map(i => bitAt(x,i))\n",
"}\n",
"\n",
"def countBits(x:Int) = {\n",
" bitsToArray(x).sum\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(0,Vector())\n",
"(1,Vector(1))\n",
"(2,Vector(0, 1))\n",
"(3,Vector(1, 1))\n",
"(4,Vector(0, 0, 1))\n",
"(5,Vector(1, 0, 1))\n",
"(6,Vector(0, 1, 1))\n",
"(7,Vector(1, 1, 1))\n",
"(8,Vector(0, 0, 0, 1))\n",
"(9,Vector(1, 0, 0, 1))\n"
]
}
],
"source": [
"// Example using bit routines.\n",
"Stream.from(0).map(i => (i,bitsToArray(i))).take(10).\n",
" toArray.foreach(println)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"10"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"bitLen(1023)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"// Create plain RDD.\n",
"val rdd = sc.parallelize(Range(0,1023))\n",
"\n",
"// Convert to LabeledPoint RDD.\n",
"val data = rdd.\n",
" map(x => {\n",
" // Output will be number of bits.\n",
" val label:Double = if (countBits(x) > 5) 1.0 else 0.0\n",
"\n",
" // Pad array with zeros so all inputs same length.\n",
" val features = bitsToArray(x).padTo(10,0).map(_.toDouble).toArray\n",
"\n",
" // Create labeled point.\n",
" new LabeledPoint(label, new DenseVector(features))\n",
" }).toDF(\"label\",\"features\")"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Split data into training (70%) and test (30%).\n",
"val splits = data.randomSplit(Array(0.7, 0.3))\n",
"val (train, test) = (splits(0), splits(1))\n",
"\n",
"// Specify layers for NN. Input -> Hidden -> Output\n",
"val layers = Array[Int](10, 15, 2)\n",
"\n",
"// Create trainer. \n",
"val trainer = new MultilayerPerceptronClassifier().\n",
" setLayers(layers).\n",
" setBlockSize(128).\n",
" setSeed(1234L).\n",
" setMaxIter(100)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"mlpc_dbc638e4b209"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"// Train model.\n",
"val model = trainer.fit(train)\n",
"model"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"// Compute precision, recall, f1.\n",
"val result = model.transform(test)\n",
"val predictionAndLabels = result.select(\"prediction\", \"label\")\n",
"val evaluator = new MulticlassClassificationEvaluator()"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"precision = 1.0\n",
"recall = 1.0\n",
"f1 = 1.0\n"
]
}
],
"source": [
"Array(\"precision\",\"recall\",\"f1\").foreach { metric =>\n",
" evaluator.setMetricName(metric)\n",
" println(metric + \" = \" + evaluator.evaluate(predictionAndLabels))\n",
"}"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Toree",
"language": "",
"name": "toree"
},
"language_info": {
"name": "scala"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment