Last active
April 8, 2016 09:52
-
-
Save anonymous/31603d8300747c022864274eb8ab4672 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.combient.sparkjob.tedsds | |
/** | |
* Created by olu on 09/03/16. | |
*/ | |
import org.apache.spark.{SparkContext, SparkConf} | |
import org.apache.spark.sql.hive.HiveContext | |
import org.apache.spark.sql.expressions.Window | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.ml.feature.{MinMaxScaler, VectorAssembler} | |
import org.apache.spark.sql.{DataFrame, Column, SaveMode, GroupedData} | |
import org.apache.spark.sql.types.{StructType,DoubleType, StructField, StringType, IntegerType} | |
import scopt.OptionParser | |
import org.apache.spark.{SparkContext, SparkConf} | |
object PrepareData { | |
case class Params(input: String = null,output: String = null) | |
def main(args: Array[String]) { | |
val defaultParams = Params() | |
val parser = new OptionParser[Params]("Prepare data for sds") { | |
head("PrepareData") | |
arg[String]("<input_tsv>") | |
.required() | |
.text("hdfs input paths tsv dataset ") | |
.action((x, c) => c.copy(input = x.trim)) | |
arg[String]("<output_parquet>") | |
.required() | |
.text("hdfs output paths parquet output ") | |
.action((x, c) => c.copy(output = x.trim)) | |
note( | |
""" | |
|For example, the following command runs this app on a dataset: | |
| | |
| bin/spark-submit --class com.combient.sparkjob.tedsds.PrepareData \ | |
| jarfile.jar \ | |
| "/user/zeppelin/pdm001/tmp.83h1YRpXM2/train_FD001.txt" \ | |
| "/share/tedsds/scaledd" | |
""".stripMargin) | |
} | |
parser.parse(args, defaultParams).map { params => | |
run(params) | |
} getOrElse { | |
System.exit(1) | |
} | |
} | |
def run(params: Params) { | |
val conf = new SparkConf().setAppName("PrepareData") | |
val sc = new SparkContext(conf) | |
val sqlContext = new HiveContext(sc) | |
import sqlContext.implicits._ | |
val customSchema = StructType(Seq( | |
StructField("id", IntegerType,nullable = false), | |
StructField("cykle", IntegerType,nullable =false), | |
StructField("setting1",DoubleType,nullable =true), | |
StructField("setting2",DoubleType,nullable =true), | |
StructField("setting3",DoubleType,nullable =true), | |
StructField("s1", DoubleType,nullable =true), | |
StructField("s2", DoubleType,nullable =true), | |
StructField("s3", DoubleType,nullable =true), | |
StructField("s4", DoubleType,nullable =true), | |
StructField("s5", DoubleType,nullable =true), | |
StructField("s6", DoubleType,nullable =true), | |
StructField("s7", DoubleType,nullable =true), | |
StructField("s8", DoubleType,nullable =true), | |
StructField("s9", DoubleType,nullable =true), | |
StructField("s10", DoubleType,nullable =true), | |
StructField("s11", DoubleType,nullable =true), | |
StructField("s12", DoubleType,nullable =true), | |
StructField("s13", DoubleType,nullable =true), | |
StructField("s14", DoubleType,nullable =true), | |
StructField("s15", DoubleType,nullable =true), | |
StructField("s16", DoubleType,nullable =true), | |
StructField("s17", DoubleType,nullable =true), | |
StructField("s18", DoubleType,nullable =true), | |
StructField("s19", DoubleType,nullable =true), | |
StructField("s20", DoubleType,nullable =true), | |
StructField("s21", DoubleType,nullable =true))) | |
val df: DataFrame = sqlContext.read | |
.format("com.databricks.spark.csv") | |
.option("header", "false") | |
.option("delimiter"," ") | |
//.option("treatEmptyValuesAsNulls","true") | |
.option("mode","PERMISSIVE") | |
.schema(customSchema) | |
.load(params.input) | |
df.persist() | |
df.show(10) | |
// calculate max cykle per id | |
val maxCyclePerId = df.groupBy($"id").agg(max($"cykle").alias("maxcykle")) | |
//maxCyclePerId.show | |
// windows for classifcation | |
val w1:Int = 30 | |
val w0:Int = 15 | |
val withrul = df.join(maxCyclePerId,"id") | |
.withColumn("rul",maxCyclePerId("maxcykle")-df("cykle")) // Add RUL as maxcycle-currentcycle per row | |
.withColumn("label1",when($"rul" <= w1, 1).otherwise(0)) // add label 1 | |
.withColumn("label2",when($"rul" <= w0, 2).otherwise(when($"rul" <= w1, 1).otherwise(0))) // add label 2 (1 if under w1, 2 if under w0, zero otherwize) | |
val windowRange = 5 | |
// see https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html | |
// http://spark.apache.org/docs/latest/sql-programming-guide.html | |
// PARTITION BY id ORDER BY cykle ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING (5) | |
val w = Window.partitionBy("id").orderBy("cykle").rowsBetween(0, windowRange) | |
// see http://spark.apache.org/docs/latest/sql-programming-guide.html | |
val x = withrul.select('*, | |
mean($"s1").over(w).as("a1"), | |
sqrt( sum(pow($"s1" - mean($"s1").over(w),2)).over(w) / 5).as("sd1"), | |
mean($"s2").over(w).as("a2"), | |
sqrt( sum(pow($"s2" - mean($"s2").over(w),2)).over(w) / 5).as("sd2"), | |
mean($"s3").over(w).as("a3"), | |
sqrt( sum(pow($"s3" - mean($"s3").over(w),2)).over(w) / 5).as("sd3"), | |
mean($"s4").over(w).as("a4"), | |
sqrt( sum(pow($"s4" - mean($"s4").over(w),2)).over(w) / 5).as("sd4"), | |
mean($"s5").over(w).as("a5"), | |
sqrt( sum(pow($"s5" - mean($"s5").over(w),2)).over(w) / 5).as("sd5"), | |
mean($"s6").over(w).as("a6"), | |
sqrt( sum(pow($"s6" - mean($"s6").over(w),2)).over(w) / 5).as("sd6"), | |
mean($"s7").over(w).as("a7"), | |
sqrt( sum(pow($"s7" - mean($"s7").over(w),2)).over(w) / 5).as("sd7"), | |
mean($"s8").over(w).as("a8"), | |
sqrt( sum(pow($"s8" - mean($"s8").over(w),2)).over(w) / 5).as("sd8"), | |
mean($"s9").over(w).as("a9"), | |
sqrt( sum(pow($"s9" - mean($"s9").over(w),2)).over(w) / 5).as("sd9"), | |
mean($"s10").over(w).as("a10"), | |
sqrt( sum(pow($"s10" - mean($"s10").over(w),2)).over(w) / 5).as("sd10"), | |
mean($"s11").over(w).as("a11"), | |
sqrt( sum(pow($"s11" - mean($"s11").over(w),2)).over(w) / 5).as("sd11"), | |
mean($"s12").over(w).as("a12"), | |
sqrt( sum(pow($"s12" - mean($"s12").over(w),2)).over(w) / 5).as("sd12"), | |
mean($"s13").over(w).as("a13"), | |
sqrt( sum(pow($"s13" - mean($"s13").over(w),2)).over(w) / 5).as("sd13"), | |
mean($"s14").over(w).as("a14"), | |
sqrt( sum(pow($"s14" - mean($"s14").over(w),2)).over(w) / 5).as("sd14"), | |
mean($"s15").over(w).as("a15"), | |
sqrt( sum(pow($"s15" - mean($"s15").over(w),2)).over(w) / 5).as("sd15"), | |
mean($"s16").over(w).as("a16"), | |
sqrt( sum(pow($"s16" - mean($"s16").over(w),2)).over(w) / 5).as("sd16"), | |
mean($"s17").over(w).as("a17"), | |
sqrt( sum(pow($"s17" - mean($"s17").over(w),2)).over(w) / 5).as("sd17"), | |
mean($"s18").over(w).as("a18"), | |
sqrt( sum(pow($"s18" - mean($"s18").over(w),2)).over(w) / 5).as("sd18"), | |
mean($"s19").over(w).as("a19"), | |
sqrt( sum(pow($"s19" - mean($"s19").over(w),2)).over(w) / 5).as("sd19"), | |
mean($"s20").over(w).as("a20"), | |
sqrt( sum(pow($"s20" - mean($"s20").over(w),2)).over(w) / 5).as("sd20"), | |
mean($"s21").over(w).as("a21"), | |
sqrt( sum(pow($"s21" - mean($"s21").over(w),2)).over(w) / 5).as("sd21") | |
) | |
// filter away columns from | |
// these columns had the lowest correlation factor : "sd11","sd20","sd4","sd12","sd17","sd8","sd15","sd7","sd2","sd3","sd21","setting1","setting2" | |
val columns = x.columns.diff(Seq("id","maxcykle","rul","label1", "label2", "sd11","sd20","sd4", | |
"sd12","sd17","sd8","sd15","sd7","sd2","sd3","sd21","setting2","setting3","s18","s19")) | |
println(s"assembler these columns to features vector ${columns.toList}") | |
//see https://spark.apache.org/docs/latest/ml-features.html | |
// columns to feature vector | |
val assembler = new VectorAssembler() | |
.setInputCols(columns.toArray) | |
.setOutputCol("features") | |
// scale the features | |
val scaler = new MinMaxScaler() | |
.setInputCol("features") | |
.setOutputCol("scaledFeatures") | |
val withFeatures = assembler.transform(x) | |
withFeatures.show(10) | |
val scaledDF = scaler.fit(withFeatures).transform(withFeatures) | |
//drop featurescolumn | |
scaledDF.drop("features") | |
scaledDF.write.mode(SaveMode.Overwrite).parquet(params.output) | |
sc.stop() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment