https://github.com/myui/hivemall/wiki/KDDCup-2012-track-2-CTR-prediction-dataset
create table training_libsvmfmt
ROW FORMAT DELIMITED
LINES TERMINATED BY "\n"
STORED AS TEXTFILE
AS
select
concat(label," ",features[1],":1.0 ",features[2],":1.0 ",features[3],":1.0 ",features[4],":1.0 ",features[5],":1.0 ",features[6],":1.0 ",features[7],":1.0 ",features[8],":1.0 ",features[9],":1.0 ",features[10],":1.0 ",features[11],":1.0 ",features[12],":1.0") as line
from
(select
label, sort_array(features) as features
from
training_orcfile
) t;
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block // call-by-name
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) / 1000000000.0 + " seconds")
result
}
time {
val training = MLUtils.loadLibSVMFile(sc, "hdfs://dm01:9000/user/hive/warehouse/kdd12track2.db/training_libsvmfmt", multiclass = false, numFeatures = 16777217, minPartitions = 64).coalesce(128)
val model1 = LogisticRegressionWithSGD.train(training, numIterations = 1)
val rdd1 = sc.parallelize(model1.weights.toArray.zipWithIndex).map{case (x, i) => s"${i+1},$x"}
rdd1.saveAsTextFile("hdfs://dm01:9000/user/hive/warehouse/kdd12track2.db/lrmodel1_spark")
}
create external table lrmodel1_spark (
feature int,
weight double
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/hive/warehouse/kdd12track2.db/lrmodel1_spark';
drop table lr_predict_spark1;
create table lr_predict_spark1
ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\t"
LINES TERMINATED BY "\n"
STORED AS TEXTFILE
as
select
t.rowid,
sigmoid(sum(m.weight)) as prob
from
testing_exploded t LEFT OUTER JOIN
lrmodel1_spark m ON (t.feature = m.feature)
group by
t.rowid
order by
rowid ASC;