Skip to content

Instantly share code, notes, and snippets.

@gbraccialli
Last active October 9, 2017 22:51
Show Gist options
  • Save gbraccialli/b98becb17a04042d802e5828bc1254a3 to your computer and use it in GitHub Desktop.
Save gbraccialli/b98becb17a04042d802e5828bc1254a3 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.functions._
val df = spark
.read
.option("inferSchema", "true")
.option("header", "true")
.option("delimiter", ";")
.csv("/Users/guilherme.braccialli/Desktop/simulado_1000_20k.csv")
val Array(dfTrain, dfTest) = df.randomSplit(Array(0.7, 0.3), seed=3)
dfTrain.cache
dfTest.cache
//check counts
println("train count=" + dfTrain.count)
println("test count=" + dfTest.count)
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._
def categVars(dfTrain: DataFrame, dfTest: DataFrame, idColumn: String, labelColumn: String, parallelism: Integer) = {
var parColumns = dfTrain
.schema
.fields
.filter(field => !(field.name.toLowerCase == idColumn.toLowerCase || field.name.toLowerCase == labelColumn.toLowerCase))
.map{field => (field.name,
field.dataType match {
case StringType => false
case _ => true
}
)
}
.par
import scala.collection.parallel._
parColumns.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(parallelism))
val outCateg = parColumns
.map{case (fieldName, dataType) => trainTreeAndReturnCategorical(dfTrain, dfTest, idColumn, labelColumn, fieldName, dataType)}
val dfSummary = outCateg.map(r => (r._1, r._2, r._3)).seq.toSeq.toDF("column", "AUCTrain", "AUCTest")
val dfTrainDiscretized = outCateg.map(r => r._4).foldLeft(dfTrain.select(idColumn, labelColumn))((buffer,element) => buffer.join(element, Seq(idColumn)))
val dfTestDiscretized = outCateg.map(r => r._5).foldLeft(dfTest.select(idColumn, labelColumn))((buffer,element) => buffer.join(element, Seq(idColumn)))
(dfSummary,dfTrainDiscretized,dfTestDiscretized)
}
def trainTreeAndReturnCategorical(dfTrain: DataFrame, dfTest: DataFrame, idColumn: String, labelColumn: String, columnToDiscretize: String, isNumeric: Boolean): (String, Double, Double, DataFrame, DataFrame) = {
println("processing field: " + columnToDiscretize)
import org.apache.spark.ml.feature.{StringIndexer,OneHotEncoder,VectorAssembler,VectorIndexer}
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.classification._
import org.apache.spark.ml.evaluation._
import org.apache.spark.sql.functions.udf
val dfTrainCol = dfTrain.select(col(idColumn), col(labelColumn), coalesce(col(columnToDiscretize), if (isNumeric) lit(-3333333) else lit("__null__")).as(columnToDiscretize))
val dfTestCol = dfTest.select(col(idColumn), col(labelColumn), coalesce(col(columnToDiscretize), if (isNumeric) lit(-3333333) else lit("__null__")).as(columnToDiscretize))
val catIndex = new StringIndexer().setInputCol(columnToDiscretize).setOutputCol(columnToDiscretize + "_index").setHandleInvalid("keep")
val labelIndexer = new StringIndexer().setInputCol(labelColumn).setOutputCol("label")
val features = new VectorAssembler()
.setInputCols(Array(if (isNumeric) columnToDiscretize else columnToDiscretize + "_index"))
.setOutputCol("features")
val model = new DecisionTreeClassifier()
val pipeline = new Pipeline().setStages(
(
if (isNumeric)
Array(features)
else
Array(catIndex, features)
)
:+ labelIndexer :+ model
)
val fit = pipeline.fit(dfTrainCol)
val resultTrain = fit.transform(dfTrainCol)
val AUCTrain = new BinaryClassificationEvaluator().evaluate(resultTrain)
val resultTest = fit.transform(dfTestCol)
val AUCTest = new BinaryClassificationEvaluator().evaluate(resultTest)
val getVectorElement = udf((vector: org.apache.spark.ml.linalg.Vector, element: Integer) => vector(element))
(
columnToDiscretize,
AUCTrain,
AUCTest,
resultTrain.select(col(idColumn),getVectorElement(col("probability"), lit(0)).as(columnToDiscretize)),
resultTest.select(col(idColumn),getVectorElement(col("probability"), lit(0)).as(columnToDiscretize))
)
}
val (summary, dfTrainD, dfTestD) = categVars(dfTrain, dfTest, "key", "y", 5)
summary.show
dfTrainD.show
dfTestD.show
val cols = dfTrainD.columns.map(c => countDistinct(col(c)).as(c))
display(
dfTrainD.agg(cols.head, cols.tail :_*).select(map(dfTrainD.columns.flatMap(c => Seq(lit(c), col(c))) :_*).as("all")).select(explode(col("all"))).withColumnRenamed("value", "number_distinct_values")
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment