Last active
March 5, 2018 10:16
-
-
Save eavidan/4f5fd5818da07e846e2cfc30dac37b28 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.ml.Pipeline | |
import org.apache.spark.ml.classification.RandomForestClassifier | |
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator} | |
import org.apache.spark.ml.feature._ | |
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession} | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} | |
/** | |
* the following creates a model that will | |
* predict the binary response variable using Random Forest classifier using spark-ml | |
*/ | |
object BigPanda { | |
def main(args: Array[String]): Unit = { | |
val spark = SparkSession.builder().master("local[*]").appName("BigPanda").getOrCreate() | |
val data = spark.read.option("header", "true").option("inferSchema", "true").csv("data.csv") | |
// Drop features with more than 70% empty values | |
val validColumns: Array[String] = getColumnNamesAccordingToMissingValues(data, 0.7) | |
val relevantCols: DataFrame = data.select(validColumns.head, validColumns.tail: _*) | |
// Impute missing values, for valid features. Had issues with getting spark 2.2 so did not use Imputer transformer | |
val clean: DataFrame = replaceMissingValuesWithAverage(relevantCols, validColumns) | |
clean.show(10) | |
// Apply TF-IDF feature extractor on feature_14 | |
val rescaledData: DataFrame = addTfIdfFeature(clean, "feature_14", "feature_14_features") | |
// Apply OneHotEncoder transformer on categorical features | |
val catCols: Array[String] = data.dtypes.filter(_._2 == "StringType").map(_._1) | |
val encoded: DataFrame = addOnHotEncouding(rescaledData, catCols) | |
val catFeatures: Array[String] = catCols.map(c => s"${c}_hot") | |
val numFeatures: Array[String] = validColumns.filter(!catCols.contains(_)) | |
val features: Array[String] = catFeatures ++ numFeatures :+ "feature_14_features" | |
// Assemble a feature vector for the random forest | |
val assembler = new VectorAssembler() | |
.setInputCols(features) | |
.setOutputCol("features") | |
val ready = assembler.transform(encoded) | |
ready.show(10) | |
// Randomly spilt the data into training set (95%) and test set (5%) | |
val Array(training, test) = ready.randomSplit(Array(0.95, 0.05), seed = 12345) | |
val labelIndexer = new StringIndexer() | |
.setInputCol("response") | |
.setOutputCol("label") | |
.fit(ready) | |
val rf = new RandomForestClassifier() | |
.setLabelCol("label") | |
.setFeaturesCol("features") | |
// Convert indexed labels back to original labels. | |
val labelConverter = new IndexToString() | |
.setInputCol("prediction") | |
.setOutputCol("predictedLabel") | |
.setLabels(labelIndexer.labels) | |
val pipeline = new Pipeline() | |
.setStages(Array(labelIndexer, rf, labelConverter)) | |
// Apply CrossValidator in order to find the best model | |
val paramGrid = new ParamGridBuilder() | |
.addGrid(rf.maxDepth, Array(4, 6, 8)) // randomForest.maxDepth - Array(4, 6, 8) | |
.addGrid(rf.numTrees, Array(10, 30, 50)) // randomForest.numTrees - Array(10, 30, 50) | |
.build() | |
val cv = new CrossValidator() | |
.setEstimator(pipeline) | |
.setEvaluator(new BinaryClassificationEvaluator) // set setEvaluator to BinaryClassificationEvaluator | |
.setEstimatorParamMaps(paramGrid) | |
.setNumFolds(3) // set numFolds to 3 | |
val cvModel = cv.fit(training) | |
val predictions = cvModel.transform(test) | |
// Lets see some example predictions on the test set | |
predictions.select("predictedLabel", "label", "features").show(10) | |
// Now we evaluate our (best) model on the test set | |
val evaluator = new MulticlassClassificationEvaluator() | |
.setLabelCol("label") | |
.setPredictionCol("prediction") | |
.setMetricName("accuracy") | |
val accuracy = evaluator.evaluate(predictions) | |
println("Test Error = " + (1.0 - accuracy)) | |
} | |
def addTfIdfFeature(clean: DataFrame, inputCol: String, outputCol: String): DataFrame = { | |
val tokenizer = new Tokenizer().setInputCol(inputCol).setOutputCol(s"${inputCol}_words") | |
val wordsData = tokenizer.transform(clean) | |
val hashingTF = new HashingTF() | |
.setInputCol(s"${inputCol}_words").setOutputCol(s"${inputCol}_rawFeatures").setNumFeatures(20) | |
val featurizedData = hashingTF.transform(wordsData) | |
val idf = new IDF().setInputCol(s"${inputCol}_rawFeatures").setOutputCol(s"${inputCol}_features") | |
val idfModel = idf.fit(featurizedData) | |
val rescaledData = idfModel.transform(featurizedData) | |
rescaledData | |
} | |
def getColumnNamesAccordingToMissingValues(data: DataFrame, threshold: Double): Array[String] = { | |
val rows = data.count() | |
val expr: Array[Column] = data.columns.map(c => (sum(col(c).isNull.cast("double")) / rows < 0.7).alias(c)) | |
val missingValuesCols: Row = data.select(expr: _*).first() | |
val validColumns: Array[String] = data.columns.filter((c: String) => missingValuesCols | |
.getValuesMap(missingValuesCols.schema.fieldNames)(c).asInstanceOf[Boolean]) | |
validColumns | |
} | |
def replaceMissingValuesWithAverage(df: DataFrame, columns: Array[String]) = { | |
val avgExpr: Array[Column] = columns.map(c => mean(c).alias(c)) | |
val colAvgs: Row = df.na.drop(columns).agg(avgExpr.head, avgExpr.tail: _*).first() | |
val numericalColsAvgMap: Map[String, Any] = colAvgs.getValuesMap[Any](colAvgs.schema.fieldNames).filter(_._2 != null) | |
val clean: DataFrame = df.na.fill(numericalColsAvgMap) | |
clean | |
} | |
def addOnHotEncouding(df: DataFrame, catCols: Array[String]): DataFrame = { | |
val indexers= catCols | |
.map(c => | |
new StringIndexer() | |
.setInputCol(c) | |
.setOutputCol(s"${c}_ix") | |
) | |
val onHots = catCols | |
.map(c => | |
new OneHotEncoder() | |
.setInputCol(s"${c}_ix") | |
.setOutputCol(s"${c}_hot") | |
) | |
val indexerPipeline = new Pipeline() | |
.setStages(indexers ++ onHots) | |
val encoded = indexerPipeline.fit(df).transform(df) | |
encoded | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment