Created
September 8, 2015 23:58
-
-
Save jkbradley/1e3cc0b3116f2f615b3f to your computer and use it in GitHub Desktop.
Running benchm-ml benchmark for random forest on Spark, using soft predictions to get better AUC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Here are 2 code snippets: | |
(1) Compute one-hot encoded data for Spark, using the data generated by https://github.com/szilard/benchm-ml/blob/master/0-init/2-gendata.txt | |
(2) Run MLlib, computing soft predictions by hand. | |
I ran these with Spark 1.4, and they should work for 1.5 as well. | |
Note: There's no real need to switch to DataFrames yet for benchmarking. Both the RDD and DataFrame APIs use the same underlying implementation. (I hope to improve on that in Spark 1.6 if there is time.) | |
Ran on EC2 cluster with 4 workers with 9.6GB memory each, and 8 partitions for training RDD. | |
For the 1M dataset, training the forest took 2080.814977193 sec and achieved AUC 0.7129779357732448 on the test set. | |
(1) Code for one-hot encoding | |
import org.apache.spark.sql.functions.{col, lit} | |
import org.apache.spark.sql.types.DoubleType | |
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler} | |
import org.apache.spark.ml.Pipeline | |
import org.apache.spark.mllib.linalg.Vector | |
// Paths | |
val origDataDir = "/mnt/mllib/regression/flightTimes/prepped" | |
val origTrainPath = origDataDir + "/train-10m.csv" | |
val origTestPath = origDataDir + "/test.csv" | |
val newDataDir = "/mnt/mllib/regression/flightTimes/spark" | |
val newTrainPath = newDataDir + "/spark-train-10m.FIXED.parquet" | |
val newTestPath = newDataDir + "/spark-test.FIXED.parquet" | |
// Read CSV as Spark DataFrames | |
val trainDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(origTrainPath) | |
val testDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load(origTestPath) | |
// Combine train, test temporarily | |
val fullDF = trainDF.withColumn("isTrain", lit(true)).unionAll(testDF.withColumn("isTrain", lit(false))) | |
display(fullDF) | |
// Feature types | |
val vars_categ = Array("Month","DayofMonth","DayOfWeek","UniqueCarrier", "Origin", "Dest") | |
val vars_num = Array("DepTime","Distance") | |
val vars_num_double = vars_num.map(_ + "_double") | |
val var_y = "dep_delayed_15min" | |
// Cast column types as needed | |
val fullDF2 = fullDF.withColumn("DepTime_double", col("DepTime").cast(DoubleType)).withColumn("Distance_double", col("Distance").cast(DoubleType)) | |
display(fullDF2) | |
// Assemble Pipeline for featurization. | |
// Need to use StringIndexer for OneHotEncoder since it does not yet support String input (but it will). | |
val stringIndexers = vars_categ.map(colName => new StringIndexer().setInputCol(colName).setOutputCol(colName + "_indexed")) | |
val oneHotEncoders = vars_categ.map(colName => new OneHotEncoder().setInputCol(colName + "_indexed").setOutputCol(colName + "_ohe").setDropLast(false)) | |
val catAssembler = new VectorAssembler().setInputCols(vars_categ.map(_ + "_ohe")).setOutputCol("catFeatures") | |
val featureAssembler = new VectorAssembler().setInputCols(vars_num_double :+ "catFeatures").setOutputCol("features") | |
val labelIndexer = new StringIndexer().setInputCol(var_y).setOutputCol("label") | |
val pipeline = new Pipeline().setStages(stringIndexers ++ oneHotEncoders ++ Array(catAssembler, featureAssembler, labelIndexer)) | |
// Compute features. | |
val pipelineModel = pipeline.fit(fullDF2) | |
val transformedDF = pipelineModel.transform(fullDF2) | |
display(transformedDF) | |
// Split back into train, test | |
val finalTrainDF = transformedDF.where(col("isTrain")) | |
val finalTestDF = transformedDF.where(!col("isTrain")) | |
// Save Spark DataFrames as Parquet | |
finalTrainDF.write.mode("overwrite").parquet(newTrainPath) | |
finalTestDF.write.mode("overwrite").parquet(newTestPath) | |
(2) AUC/accuracy | |
import org.apache.spark.mllib.regression.LabeledPoint | |
import org.apache.spark.mllib.linalg.{Vector, Vectors} | |
import org.apache.spark.mllib.tree.RandomForest | |
import org.apache.spark.mllib.tree.configuration.Strategy | |
import org.apache.spark.mllib.tree.model.RandomForestModel | |
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics | |
import org.apache.spark.sql.{DataFrame, Row} | |
// Paths | |
val dataDir = "/mnt/mllib/regression/flightTimes/spark" | |
val trainDataPath = dataDir + "/spark-train-0.1m.FIXED.parquet" | |
val testDataPath = dataDir + "/spark-test.FIXED.parquet" | |
// Load DataFrame, and convert to RDD of LabeledPoints | |
def toLP(df: DataFrame): RDD[LabeledPoint] = { | |
df.select("label", "features").map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) }.repartition(8) | |
} | |
val train = toLP(sqlContext.read.parquet(trainDataPath)).cache() | |
val test = toLP(sqlContext.read.parquet(testDataPath)).cache() | |
(train.count(), test.count()) | |
// Train model | |
val numClasses = 2 | |
val categoricalFeaturesInfo = Map[Int, Int]() | |
val numTrees = 500 | |
val featureSubsetStrategy = "sqrt" | |
val impurity = "gini" | |
val maxDepth = 20 | |
val maxBins = 50 | |
val now = System.nanoTime | |
val model = RandomForest.trainClassifier(train, numClasses, categoricalFeaturesInfo, | |
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) | |
val elapsed = ( System.nanoTime - now )/1e9 | |
elapsed | |
// Compute soft predictions. For spark.mllib trees, this works for binary classification. | |
// Spark 1.5 will include it for multiclass under the spark.ml API. | |
import org.apache.spark.mllib.tree.configuration.FeatureType.Continuous | |
import org.apache.spark.mllib.tree.model.{DecisionTreeModel, Node} | |
def softPredict(node: Node, features: Vector): Double = { | |
if (node.isLeaf) { | |
if (node.predict.predict == 1.0) node.predict.prob else 1.0 - node.predict.prob | |
} else { | |
if (node.split.get.featureType == Continuous) { | |
if (features(node.split.get.feature) <= node.split.get.threshold) { | |
softPredict(node.leftNode.get, features) | |
} else { | |
softPredict(node.rightNode.get, features) | |
} | |
} else { | |
if (node.split.get.categories.contains(features(node.split.get.feature))) { | |
softPredict(node.leftNode.get, features) | |
} else { | |
softPredict(node.rightNode.get, features) | |
} | |
} | |
} | |
} | |
def softPredict(dt: DecisionTreeModel, features: Vector): Double = { | |
softPredict(dt.topNode, features) | |
} | |
// Compute AUC | |
val scoreAndLabels = test.map { point => | |
//val score = model.trees.map(_.predict(point.features)).filter(_>0).size.toDouble / model.numTrees | |
val score = model.trees.map(tree => softPredict(tree, point.features)).sum / model.numTrees | |
(score, point.label) | |
} | |
val metrics = new BinaryClassificationMetrics(scoreAndLabels) | |
metrics.areaUnderROC() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment