-- set mapred.max.split.size=128000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.tez.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.mapjoin.smalltable.filesize=30000000;
-- set hive.optimize.s3.query=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.optimize.sort.dynamic.partition=false;
create database criteo;
use criteo;
drop table train_raw;
create external table train_raw (
rowid bigint,
label int,
-- quantitative variables
i1 int,i2 int,i3 int,i4 int,i5 int,i6 int,i7 int,i8 int,i9 int,i10 int,i11 int,i12 int,i13 int,
-- categorical variables
c1 string,c2 string,c3 string,c4 string,c5 string,c6 string,c7 string,c8 string,c9 string,c10 string,c11 string,c12 string,c13 string,c14 string,c15 string,c16 string,c17 string,c18 string,c19 string,c20 string,c21 string,c22 string,c23 string,c24 string,c25 string,c26 string
) ROW FORMAT
DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE LOCATION 's3://myui-dev/emr/warehouse/criteo/train_raw/';
drop table test_raw;
create external table test_raw (
rowid bigint,
-- quantitative variables
i1 int,i2 int,i3 int,i4 int,i5 int,i6 int,i7 int,i8 int,i9 int,i10 int,i11 int,i12 int,i13 int,
-- categorical variables
c1 string,c2 string,c3 string,c4 string,c5 string,c6 string,c7 string,c8 string,c9 string,c10 string,c11 string,c12 string,c13 string,c14 string,c15 string,c16 string,c17 string,c18 string,c19 string,c20 string,c21 string,c22 string,c23 string,c24 string,c25 string,c26 string
) ROW FORMAT
DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE LOCATION 's3://myui-dev/emr/warehouse/criteo/test_raw/';
awk '{IFS="\t"; OFS="\t"} {print 10000000+NR-1,$0}' train.txt | \
hadoop fs -put - /dataset/criteo/train/train.txt
awk '{IFS="\t"; OFS="\t"} {print 60000000+NR-1,$0}' test.txt | \
hadoop fs -put - /dataset/criteo/test/test.txt
drop table train;
create external table train (
rowid bigint,
features array<string>,
label int
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/train/'
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE train
select
rowid,
add_field_indicies(array_concat(
quantitative_features(
array('i1','i2','i3','i4','i5','i6','i7','i8','i9','i10','i11','i12','i13'),
ln(i1+1),
ln(i2+4), -- min(i2) is -3
ln(i3+1),
ln(i4+1),
ln(i5+1),
ln(i6+1),
ln(i7+1),
ln(i8+1),
ln(i9+1),
ln(i10+1),
ln(i11+1),
ln(i12+1),
ln(i13+1),
"-emit_null"
),
categorical_features(
array('c1','c2','c3','c4','c5','c6','c7','c8','c9','c10','c11','c12','c13','c14','c15','c16','c17','c18','c19','c20','c21','c22','c23','c24','c25','c26'),
c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,
"-emit_null -force_value"
)
)) as features,
label
from
train_raw
where
label IS NOT NULL -- rowid 54203165
CLUSTER BY rand(1)
;
drop table test;
create external table test (
rowid bigint,
features array<string>
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/test/'
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE test
select
rowid,
add_field_indicies(array_concat(
quantitative_features(
array('i1','i2','i3','i4','i5','i6','i7','i8','i9','i10','i11','i12','i13'),
ln(i1+1),
ln(i2+4), -- min(i2) is -3
ln(i3+1),
ln(i4+1),
ln(i5+1),
ln(i6+1),
ln(i7+1),
ln(i8+1),
ln(i9+1),
ln(i10+1),
ln(i11+1),
ln(i12+1),
ln(i13+1),
"-emit_null"
),
categorical_features(
array('c1','c2','c3','c4','c5','c6','c7','c8','c9','c10','c11','c12','c13','c14','c15','c16','c17','c18','c19','c20','c21','c22','c23','c24','c25','c26'),
c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,
"-emit_null -force_value"
)
)) as features
from
test_raw
;
-- Hive on Tez
SET tez.task.resource.memory.mb=3072;
SET hive.tez.java.opts=-server -Xmx2560m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC -XX:+HeapDumpOnOutOfMemoryError;
-- Hive on MapReuce
SET mapreduce.map.memory.mb=3072;
SET mapreduce.map.java.opts=-server -Xmx2560m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC -XX:+HeapDumpOnOutOfMemoryError;
SET mapred.max.split.size=64000000; -- use more mappers to avoid OOM in mappers
drop table ffm_model;
create external table ffm_model (
model_id string,
i int,
Wi float,
Vi array<float>
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/ffm_model/'
TBLPROPERTIES ("orc.compress"="SNAPPY");
INSERT OVERWRITE TABLE ffm_model
select
train_ffm(features, label, "-c -iters 10 -factors 4 -feature_hashing 20")
from
train
;
drop table criteo2.ffm_predicted;
create table criteo2.ffm_predicted
ROW FORMAT DELIMITED
FIELDS TERMINATED BY "\t"
LINES TERMINATED BY "\n"
STORED AS TEXTFILE
as
WITH testing_exploded as (
select
t1.rowid,
t2.i,
t2.j,
t2.Xi,
t2.Xj
from
test t1
LATERAL VIEW feature_pairs(features, "-ffm -feature_hashing 20") t2 as i, j, Xi, Xj
),
predicted as (
select
t1.rowid,
p1.model_id,
ffm_predict(
p1.Wi,
p1.Vi, -- Vij
p2.Vi, -- Vji
t1.Xi,
t1.Xj
) as predicted
from
testing_exploded t1
LEFT OUTER JOIN ffm_model p1 ON (p1.i = t1.i)
LEFT OUTER JOIN ffm_model p2 ON (p2.i = t1.j and p2.model_id = p1.model_id)
group by
t1.rowid,
p1.model_id
),
ensembled as (
select
rowid,
voted_avg(predicted) as score -- classification
-- avg(predicted) as score -- regression
from
predicted
group by
rowid
)
select
rowid,
sigmoid(score) as predicted -- classification
-- score as predicted -- regression
from
ensembled
;