Skip to content

Instantly share code, notes, and snippets.

@ironpeace
Last active August 29, 2015 13:57
Show Gist options
  • Save ironpeace/9583847 to your computer and use it in GitHub Desktop.
Save ironpeace/9583847 to your computer and use it in GitHub Desktop.
Kaggle(ds2014)用ETL処理
import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
object KaggleApp {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Kaggle")
println(" ************************* train ")
impl(sc.textFile("mydata/train.csv"), "train")
println(" ************************* test ")
impl(sc.textFile("mydata/test_v2.csv"), "test")
println("Grouped's params as CSV header")
val g = new Grouped("","","","",0,"",0,"","",0,0,"","",0,false,false,false,false,false,false,false,false,false,false,false,false,0,0,0,0,"","","","","",0,0,0,0,"","","","","","","","","","","","","","",0,"","","","","","","",0,0,0,0,0,0,0,0,"",0)
val tpe = ExtractTypeInfo.getType(g)
println(ExtractTypeInfo.getParamsAsCsvHeader(tpe))
}
def impl(tf:RDD[String], typ:String) {
// 読み込んだCSVファイルの件数を確認
println("train length : " + tf.count())
// [info] train length : 665249
// CSVを1行ずつHistoryオブジェクトに格納
val histories:RDD[History]
= tf.map(line => line.split(",")).map(cols =>
new History(
cols(0), // customer_ID
cols(1), // shopping_pt
cols(2), // record_type
cols(3), // day
cols(4), // time
cols(5), // state
cols(6), // location
cols(7).toInt, // group_size
cols(8), // homeowner
cols(9).toInt, // car_age
cols(10), // car_value
cols(11), // risk_factor
cols(12).toInt, // age_oldest
cols(13).toInt, // age_youngest
cols(14), // married_couple
cols(15), // C_previous
// NAだったら0を挿入する
if(cols(16) == "NA") 0 else cols(16).toInt, // duration_previous
cols(17), // A
cols(18), // B
cols(19), // C
cols(20), // D
cols(21), // E
cols(22), // F
cols(23), // G
cols(24).toInt // cost
)
)
// customer_ID をキーにグルーピングした結果を、 Grouped オブジェクトに格納
if(typ == "train"){
val groupedTrain:RDD[Grouped]
= histories
.map(hist => (hist.customer_ID, hist))
.groupByKey().mapValues { hists =>
new Grouped(
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.customer_ID,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.day,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.state,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.location,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.group_size,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.homeowner,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.car_age,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.car_value,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.risk_factor,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.age_oldest,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.age_youngest,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.married_couple,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.c_previous,
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.duration_previous,
// customer_ID キーでグルーピングされたリストに対して、
// さらに 各プロパティ でグルーピングしてその件数を確認
// 1件だけならグルーピングの中で唯一と判断できる
if(hists.groupBy(_.state).size == 1) true else false,
if(hists.groupBy(_.location).size == 1) true else false,
if(hists.groupBy(_.group_size).size == 1) true else false,
if(hists.groupBy(_.homeowner).size == 1) true else false,
if(hists.groupBy(_.car_age).size == 1) true else false,
if(hists.groupBy(_.car_value).size == 1) true else false,
if(hists.groupBy(_.risk_factor).size == 1) true else false,
if(hists.groupBy(_.age_oldest).size == 1) true else false,
if(hists.groupBy(_.age_youngest).size == 1) true else false,
if(hists.groupBy(_.married_couple).size == 1) true else false,
if(hists.groupBy(_.c_previous).size == 1) true else false,
if(hists.groupBy(_.duration_previous).size == 1) true else false,
hists.filter(_.record_type == "0").size, // count
hists.filter(_.record_type == "0").minBy(_.cost).cost, // minCost
hists.filter(_.record_type == "0").map(_.cost).sum / hists.filter(_.record_type == "0").size, // meanCost
hists.filter(_.record_type == "0").maxBy(_.cost).cost, // maxCost
hists.filter(_.record_type == "0").minBy(_.cost).policy, // minCostPolicy
hists.filter(_.record_type == "0").maxBy(_.cost).policy, // maxCostPolicy
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.policy, // evePolicy
hists.filter(_.shopping_pt == "1")(0).policy, // firstPolicy
// hists(0).policy, // firstPolicy
hists.filter(_.record_type == "0").sortBy(_.shopping_pt).last.policy, // lastPolicy
hists.filter(_.record_type == "0").groupBy(_.day).size, // daysCount
howLong(hists.filter(_.record_type == "0")), // howLong
howPrev(hists), // howPrevPolicy
// hists.filter(_.record_type == "1")(0).cost, // finCost
hists.sortBy(_.shopping_pt).last.cost, // finCost
hists.filter(_.record_type == "1")(0).A,
hists.filter(_.record_type == "1")(0).B,
hists.filter(_.record_type == "1")(0).C,
hists.filter(_.record_type == "1")(0).D,
hists.filter(_.record_type == "1")(0).E,
hists.filter(_.record_type == "1")(0).F,
hists.filter(_.record_type == "1")(0).G,
/**
if(hists.size > 0) hists.filter(_.shopping_pt == "1")(0).A else "",
if(hists.size > 0) hists.filter(_.shopping_pt == "1")(0).B else "",
if(hists.size > 0) hists.filter(_.shopping_pt == "1")(0).C else "",
if(hists.size > 0) hists.filter(_.shopping_pt == "1")(0).D else "",
if(hists.size > 0) hists.filter(_.shopping_pt == "1")(0).E else "",
if(hists.size > 0) hists.filter(_.shopping_pt == "1")(0).F else "",
if(hists.size > 0) hists.filter(_.shopping_pt == "1")(0).G else "",
if(hists.size > 1) hists.filter(_.shopping_pt == "2")(0).A else "",
if(hists.size > 1) hists.filter(_.shopping_pt == "2")(0).B else "",
if(hists.size > 1) hists.filter(_.shopping_pt == "2")(0).C else "",
if(hists.size > 1) hists.filter(_.shopping_pt == "2")(0).D else "",
if(hists.size > 1) hists.filter(_.shopping_pt == "2")(0).E else "",
if(hists.size > 1) hists.filter(_.shopping_pt == "2")(0).F else "",
if(hists.size > 1) hists.filter(_.shopping_pt == "2")(0).G else "",
**/
hists.filter(_.shopping_pt == "1")(0).A,
hists.filter(_.shopping_pt == "1")(0).B,
hists.filter(_.shopping_pt == "1")(0).C,
hists.filter(_.shopping_pt == "1")(0).D,
hists.filter(_.shopping_pt == "1")(0).E,
hists.filter(_.shopping_pt == "1")(0).F,
hists.filter(_.shopping_pt == "1")(0).G,
hists.filter(_.shopping_pt == "1")(0).cost,
hists.filter(_.shopping_pt == "2")(0).A,
hists.filter(_.shopping_pt == "2")(0).B,
hists.filter(_.shopping_pt == "2")(0).C,
hists.filter(_.shopping_pt == "2")(0).D,
hists.filter(_.shopping_pt == "2")(0).E,
hists.filter(_.shopping_pt == "2")(0).F,
hists.filter(_.shopping_pt == "2")(0).G,
hists.filter(_.shopping_pt == "2")(0).cost,
howChg(hists, "A"),
howChg(hists, "B"),
howChg(hists, "C"),
howChg(hists, "D"),
howChg(hists, "E"),
howChg(hists, "F"),
howChg(hists, "G"),
getChgOpt( howChg(hists, "A"),
howChg(hists, "B"),
howChg(hists, "C"),
howChg(hists, "D"),
howChg(hists, "E"),
howChg(hists, "F"),
howChg(hists, "G")),
howOptChg(howChg(hists, "A"),howChg(hists, "B"),
howChg(hists, "C"),howChg(hists, "D"),
howChg(hists, "E"),howChg(hists, "F"),howChg(hists, "G"))
)
}.map(groupedHistory => groupedHistory._2)
// グループングされた件数を確認
println("groupedTrain length :" + groupedTrain.count())
groupedTrain.saveAsTextFile("mydata/groups.train")
}else if(typ == "test"){
val groupedTest:RDD[Grouped]
= histories
.map(hist => (hist.customer_ID, hist))
.groupByKey().mapValues { hists =>
new Grouped(
hists.sortBy(_.shopping_pt).last.customer_ID,
hists.sortBy(_.shopping_pt).last.day,
hists.sortBy(_.shopping_pt).last.state,
hists.sortBy(_.shopping_pt).last.location,
hists.sortBy(_.shopping_pt).last.group_size,
hists.sortBy(_.shopping_pt).last.homeowner,
hists.sortBy(_.shopping_pt).last.car_age,
hists.sortBy(_.shopping_pt).last.car_value,
hists.sortBy(_.shopping_pt).last.risk_factor,
hists.sortBy(_.shopping_pt).last.age_oldest,
hists.sortBy(_.shopping_pt).last.age_youngest,
hists.sortBy(_.shopping_pt).last.married_couple,
hists.sortBy(_.shopping_pt).last.c_previous,
hists.sortBy(_.shopping_pt).last.duration_previous,
// customer_ID キーでグルーピングされたリストに対して、
// さらに 各プロパティ でグルーピングしてその件数を確認
// 1件だけならグルーピングの中で唯一と判断できる
if(hists.groupBy(_.state).size == 1) true else false,
if(hists.groupBy(_.location).size == 1) true else false,
if(hists.groupBy(_.group_size).size == 1) true else false,
if(hists.groupBy(_.homeowner).size == 1) true else false,
if(hists.groupBy(_.car_age).size == 1) true else false,
if(hists.groupBy(_.car_value).size == 1) true else false,
if(hists.groupBy(_.risk_factor).size == 1) true else false,
if(hists.groupBy(_.age_oldest).size == 1) true else false,
if(hists.groupBy(_.age_youngest).size == 1) true else false,
if(hists.groupBy(_.married_couple).size == 1) true else false,
if(hists.groupBy(_.c_previous).size == 1) true else false,
if(hists.groupBy(_.duration_previous).size == 1) true else false,
hists.size, // count
hists.minBy(_.cost).cost, // minCost
hists.map(_.cost).sum / hists.size, // meanCost
hists.maxBy(_.cost).cost, // maxCost
hists.minBy(_.cost).policy, // minCostPolicy
hists.maxBy(_.cost).policy, // maxCostPolicy
hists.sortBy(_.shopping_pt).last.policy, // evePolicy
hists.filter(_.shopping_pt == "1")(0).policy, // firstPolicy
"", // lastPolicy
hists.groupBy(_.day).size, // daysCount
howLong(hists), // howLong
0, // howPrevPolicy
0, // finCost
hists.sortBy(_.shopping_pt).last.A,
hists.sortBy(_.shopping_pt).last.B,
hists.sortBy(_.shopping_pt).last.C,
hists.sortBy(_.shopping_pt).last.D,
hists.sortBy(_.shopping_pt).last.E,
hists.sortBy(_.shopping_pt).last.F,
hists.sortBy(_.shopping_pt).last.G,
hists.filter(_.shopping_pt == "1")(0).A,
hists.filter(_.shopping_pt == "1")(0).B,
hists.filter(_.shopping_pt == "1")(0).C,
hists.filter(_.shopping_pt == "1")(0).D,
hists.filter(_.shopping_pt == "1")(0).E,
hists.filter(_.shopping_pt == "1")(0).F,
hists.filter(_.shopping_pt == "1")(0).G,
hists.filter(_.shopping_pt == "1")(0).cost,
hists.filter(_.shopping_pt == "2")(0).A,
hists.filter(_.shopping_pt == "2")(0).B,
hists.filter(_.shopping_pt == "2")(0).C,
hists.filter(_.shopping_pt == "2")(0).D,
hists.filter(_.shopping_pt == "2")(0).E,
hists.filter(_.shopping_pt == "2")(0).F,
hists.filter(_.shopping_pt == "2")(0).G,
hists.filter(_.shopping_pt == "2")(0).cost,
howChg(hists, "A"),
howChg(hists, "B"),
howChg(hists, "C"),
howChg(hists, "D"),
howChg(hists, "E"),
howChg(hists, "F"),
howChg(hists, "G"),
getChgOpt( howChg(hists, "A"),
howChg(hists, "B"),
howChg(hists, "C"),
howChg(hists, "D"),
howChg(hists, "E"),
howChg(hists, "F"),
howChg(hists, "G")),
howOptChg( howChg(hists, "A"),
howChg(hists, "B"),
howChg(hists, "C"),
howChg(hists, "D"),
howChg(hists, "E"),
howChg(hists, "F"),
howChg(hists, "G"))
)
}.map(groupedHistory => groupedHistory._2)
println("groupedTrain length :" + groupedTest.count())
groupedTest.saveAsTextFile("mydata/groups.test")
}
/**
この結果、mydata/groups/part-0000* というファイルが複数作成されるので、それを以下のようにひとつのファイルに結合
touch groups.csv
cat groups/part-00000 >> groups.csv
cat groups/part-00001 >> groups.csv
cat groups.csv | wc -l
**/
}//*
def howPrev(hists:Seq[History]):Int = {
val sorted = hists.sortBy(_.shopping_pt).reverse //降順ソート
val fin = sorted(0)
val prevs = sorted.tail
var cnt = 0
prevs.foreach(prev =>
if(prev.policy == fin.policy){
cnt = cnt + 1
return cnt
}else{
cnt = cnt + 1
}
)
return cnt
}
def howLong(hists:Seq[History]):Int = {
val sorted = hists.sortBy(_.shopping_pt).reverse //降順ソート
if(sorted.size == 1){
return 0
}else{
// tpl : (preDay, preTime, hl)
sorted.foldLeft((0, 0, 0)){ (tpl, hist) =>
val nowTime = toMin(hist.time)
if(tpl._1 == hist.day.toInt){
(hist.day.toInt, nowTime, nowTime - tpl._2 + tpl._3)
}else{
(hist.day.toInt, nowTime, nowTime + tpl._3)
}
}._3
}
}
def toMin(hhmm:String):Int = {
val h = hhmm.substring(0,2).toInt
val m = hhmm.substring(3,5).toInt
return h * 60 + m
}
def howChg(hists:Seq[History], a2g:String):Int = {
val sorted = hists.sortBy(_.shopping_pt)
if(sorted.size == 1){
return 0
}else{
// tpl : (pre, count)
sorted.foldLeft(("",0)){ (tpl, hist) =>
a2g match {
case "A" => howChangedImpl(tpl._1, hist.A, tpl._2)
case "B" => howChangedImpl(tpl._1, hist.B, tpl._2)
case "C" => howChangedImpl(tpl._1, hist.C, tpl._2)
case "D" => howChangedImpl(tpl._1, hist.D, tpl._2)
case "E" => howChangedImpl(tpl._1, hist.E, tpl._2)
case "F" => howChangedImpl(tpl._1, hist.F, tpl._2)
case "G" => howChangedImpl(tpl._1, hist.G, tpl._2)
case _ => throw new Exception("Unknown a2g :" + a2g)
}
}._2
}
}
def howOptChg(a:Int, b:Int, c:Int, d:Int, e:Int, f:Int, g:Int) : Int = {
val ca = if(a > 0) 1 else 0
val cb = if(b > 0) 1 else 0
val cc = if(c > 0) 1 else 0
val cd = if(d > 0) 1 else 0
val ce = if(e > 0) 1 else 0
val cf = if(f > 0) 1 else 0
val cg = if(g > 0) 1 else 0
return ca + cb + cc + cd + ce + cf + cg
}
def getChgOpt(a:Int, b:Int, c:Int, d:Int, e:Int, f:Int, g:Int) : String = {
val ca = if(a > 0) 1 else 0
val cb = if(b > 0) 1 else 0
val cc = if(c > 0) 1 else 0
val cd = if(d > 0) 1 else 0
val ce = if(e > 0) 1 else 0
val cf = if(f > 0) 1 else 0
val cg = if(g > 0) 1 else 0
return "c" + ca.toString + cb.toString + cc.toString + cd.toString + ce.toString + cf.toString + cg.toString
}
def howChangedImpl(pre:String, opt:String, count:Int):(String, Int) = {
if(pre == ""){
(opt, 0)
}else{
if(pre == opt){
(opt, count)
}else{
(opt, count + 1)
}
}
}
}
import scala.reflect.runtime.{universe => ru}
import ru._
class History(
val customer_ID:String,
val shopping_pt:String,
val record_type:String,
val day:String,
val time:String,
val state:String,
val location:String,
val group_size:Int,
val homeowner:String,
val car_age:Int,
val car_value:String,
val risk_factor:String,
val age_oldest:Int,
val age_youngest:Int,
val married_couple:String,
val c_previous:String,
val duration_previous:Int,
val A:String,
val B:String,
val C:String,
val D:String,
val E:String,
val F:String,
val G:String,
val cost:Int
) extends Serializable {
def policy:String = A + B + C + D + E + F + G
override def toString = {
customer_ID + "," +
shopping_pt + "," +
record_type + "," +
day + "," +
time + "," +
state + "," +
location + "," +
group_size + "," +
homeowner + "," +
car_age + "," +
car_value + "," +
risk_factor + "," +
age_oldest + "," +
age_youngest + "," +
married_couple + "," +
c_previous + "," +
duration_previous + "," +
A + "," +
B + "," +
C + "," +
D + "," +
E + "," +
F + "," +
G + "," +
cost
}
}
class Grouped(
val customer_ID:String,
val day:String,
val state:String,
val location:String,
val group_size:Int,
val homeowner:String,
val car_age:Int,
val car_value:String,
val risk_factor:String,
val age_oldest:Int,
val age_youngest:Int,
val married_couple:String,
val c_previous:String,
val duration_previous:Int,
val isSingle_state:Boolean,
val isSingle_location:Boolean,
val isSingle_group_size:Boolean,
val isSingle_homeowner:Boolean,
val isSingle_car_age:Boolean,
val isSingle_car_value:Boolean,
val isSingle_risk_factor:Boolean,
val isSingle_age_oldest:Boolean,
val isSingle_age_youngest:Boolean,
val isSingle_married_couple:Boolean,
val isSingle_c_previous:Boolean,
val isSingle_duration_previous:Boolean,
val count:Int, // 全部で何回見積もりしたか
val minCost:Int,
val meanCost:Int,
val maxCost:Int,
val minCostPolicy:String,
val maxCostPolicy:String,
val evePolicy:String,
val firstPolicy:String,
val lastPolicy:String,
val daysCount:Int,
val howLong:Int,
val howPrevPolicy:Int,
val finCost:Int,
val vA:String,
val vB:String,
val vC:String,
val vD:String,
val vE:String,
val vF:String,
val vG:String,
val vA1:String,
val vB1:String,
val vC1:String,
val vD1:String,
val vE1:String,
val vF1:String,
val vG1:String,
val cost1:Int,
val vA2:String,
val vB2:String,
val vC2:String,
val vD2:String,
val vE2:String,
val vF2:String,
val vG2:String,
val cost2:Int,
val cA:Int, //How many times Changed A
val cB:Int,
val cC:Int,
val cD:Int,
val cE:Int,
val cF:Int,
val cG:Int,
val chgOpt:String,
val howChanged:Int
) extends Serializable
{
override def toString = {
customer_ID + "," +
day + "," +
state + "," +
location + "," +
group_size + "," +
homeowner + "," +
car_age + "," +
car_value + "," +
risk_factor + "," +
age_oldest + "," +
age_youngest + "," +
married_couple + "," +
c_previous + "," +
duration_previous + "," +
isSingle_state + "," +
isSingle_location + "," +
isSingle_group_size + "," +
isSingle_homeowner + "," +
isSingle_car_age + "," +
isSingle_car_value + "," +
isSingle_risk_factor + "," +
isSingle_age_oldest + "," +
isSingle_age_youngest + "," +
isSingle_married_couple + "," +
isSingle_c_previous + "," +
isSingle_duration_previous + "," +
count + "," +
minCost + "," +
meanCost + "," +
maxCost + "," +
"p" + minCostPolicy + "," +
"p" + maxCostPolicy + "," +
"p" + evePolicy + "," +
"p" + firstPolicy + "," +
"p" + lastPolicy + "," +
daysCount + "," +
howLong + "," +
howPrevPolicy + "," +
finCost + "," +
vA + "," +
vB + "," +
vC + "," +
vD + "," +
vE + "," +
vF + "," +
vG + "," +
vA1 + "," +
vB1 + "," +
vC1 + "," +
vD1 + "," +
vE1 + "," +
vF1 + "," +
vG1 + "," +
cost1 + "," +
vA2 + "," +
vB2 + "," +
vC2 + "," +
vD2 + "," +
vE2 + "," +
vF2 + "," +
vG2 + "," +
cost2 + "," +
cA + "," +
cB + "," +
cC + "," +
cD + "," +
cE + "," +
cF + "," +
cG + "," +
chgOpt + "," +
howChanged
}
}
class States(
val state:String, //州略称
val state_e:String, //州名称(英語)
val state_j:String, //州名称(日本語)
val kokusei1:String, //国勢調査1コード
val kokusei1_ryk:String, //国勢調査1略号
val kokusei2:String, //国勢調査2コード
val kokusei2_ryk:String, //国勢調査2略号
val gyosei:String, //行政管理予算局コード
val gyosei_ryk:String, //行政管理予算局略号
val keizai:String, //経済分析局
val keizai_ryk:String, //経済分析局略号
val hanzai:Int, //暴力犯罪発生率
val hanzai_hyoka:String, //暴力犯罪評価
val jinko:Int, //人口
val jinko_mitsudo:Int, //人口密度
val jinko_mitsudo_hyoka:String, //人口密度の評価
val shotoku_jumin:Int, //住民1人あたり所得
val shotoku_kojin:Int, //個人所得
val shotoku_hensa:Int, //偏差値
val shotoku_hyoka:Int, //所得評価
val menkyo_seigen_ari:Int, //制限付き免許年齢
val menkyo:Int, //免許年齢
val menkyo_joken:String //免許条件有無
) extends Serializable
{
override def toString = {
state + "," +
state_e + "," +
state_j + "," +
kokusei1 + "," +
kokusei1_ryk + "," +
kokusei2 + "," +
kokusei2_ryk + "," +
gyosei + "," +
gyosei_ryk + "," +
keizai + "," +
keizai_ryk + "," +
hanzai + "," +
hanzai_hyoka + "," +
jinko + "," +
jinko_mitsudo + "," +
jinko_mitsudo_hyoka + "," +
shotoku_jumin + "," +
shotoku_kojin + "," +
shotoku_hensa + "," +
shotoku_hyoka + "," +
menkyo_seigen_ari + "," +
menkyo + "," +
menkyo_joken
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment