This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def splitDataFrameList(df,target_column,separator): | |
''' df = dataframe to split, | |
target_column = the column containing the values to split | |
separator = the symbol used to perform the split | |
returns: a dataframe with each entry for the target column separated, with each element moved into a new row. | |
The values in the other columns are duplicated across the newly divided rows. | |
''' | |
def splitListToRows(row,row_accumulator,target_column,separator): | |
split_row = row[target_column].split(separator) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.collection.JavaConverters._ | |
import org.apache.spark.sql.types.{StructType,StructField,StringType} | |
import org.apache.spark.sql.Row | |
def identityMatrix(n:Int):Array[Array[String]]=Array.tabulate(n,n)((x,y) => if(x==y) "1" else "0") | |
def encodeStringOneHot(table:org.apache.spark.sql.DataFrame,column:String) = { | |
//Accepts the dataframe and the target column name. Returns a new dataframe in which the target column has been replaced with a one-hot/dummy encoding. | |
table.registerTempTable("temp") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val cameo_maps = event_data_ag1.rdd | |
.groupBy(x=> (x.getAs[String]("Country"),x.getAs[Int]("ElapsedMonths"))) | |
.map { case (group_features,codes) => group_features -> codes | |
.map {code => code.getAs[Int]("CAMEO Code") -> code.getAs[Long]("count") } | |
.toMap | |
} | |
val cameos = sc.broadcast(cameo_maps.map(_._2.keySet).reduce(_ union _).toArray.sorted) | |
val cameo_arrays = cameo_maps.map{ | |
case ((country,total_months),cameo_map) => (country,total_months) -> cameos.value.map(cameo_map.getOrElse(_,0L)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def groupOutcomeFractions(df:DataFrame,outcome:String,outer_group_criteria:Seq[String]):DataFrame = { | |
df.registerTempTable("df") | |
val count_variable:String = outer_group_criteria.head | |
val inner_group_criteria = outer_group_criteria :+ outcome | |
val outer_group_query = "SELECT "+ outer_group_criteria.mkString(" , ") +s", COUNT($count_variable) AS outer_count FROM df GROUP BY " + outer_group_criteria.mkString(" , ") | |
val outer_count = sqlContext.sql(outer_group_query) | |
val inner_count_query = "SELECT "+ inner_group_criteria.mkString(" , ") +s", COUNT($count_variable) AS inner_count FROM df GROUP BY " + inner_group_criteria.mkString(" , ") | |
val inner_count = sqlContext.sql(inner_count_query) | |
val combined_counts = inner_count.join(outer_count,outer_group_criteria) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def groupCountFractionals(dataframe,target,outer): | |
''' | |
dataframe: a pandas dataframe | |
target: a string corresponding to the column of interest in the dataframe | |
outer: a list of the columns by which the counts should be conditioned | |
Returns the fraction of target_criteria_group / outer_criteria_group counts. | |
Be mindful to take group sizes (Outer Count) into consideration. | |
As outer count gets smaller, the fraction value |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def findNull(row:Row):String = { | |
if (row.anyNull) { | |
val indices = (0 to row.length-1).toArray.filter(i => row.isNullAt(i)) | |
indices.mkString(",") | |
} | |
else "-1" | |
} | |
sqlContext.udf.register("findNull", findNull _) | |
df = df.withColumn("MissingGroups",callUDF("findNull",struct(df.columns.map(df(_)) : _*))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val retained_features:List[String] = group_mean_columns.filter(x=> !missings.contains(x._2)).map(_._1).toList :+ "LogAdjustedDemand" | |
//Select these columns in the training dataset | |
val model_training_data = training_data_all.select(retained_features.head,retained_features.tail: _*) |
OlderNewer