Skip to content

Instantly share code, notes, and snippets.

@myui
Last active August 3, 2017 04:50
Show Gist options
  • Save myui/c731b49e336955a3caeca9bc1339df69 to your computer and use it in GitHub Desktop.
Save myui/c731b49e336955a3caeca9bc1339df69 to your computer and use it in GitHub Desktop.
FFM on Criteo dataset

Data preparation

-- set mapred.max.split.size=128000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.tez.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.mapjoin.smalltable.filesize=30000000;
-- set hive.optimize.s3.query=true;
set hive.exec.dynamic.partition.mode=nonstrict; 
set hive.optimize.sort.dynamic.partition=false;
create database criteo;
use criteo;

drop table train_raw;
create external table train_raw (
  rowid bigint,
  label int,
  -- quantitative variables
  i1 int,i2 int,i3 int,i4 int,i5 int,i6 int,i7 int,i8 int,i9 int,i10 int,i11 int,i12 int,i13 int,
  -- categorical variables
  c1 string,c2 string,c3 string,c4 string,c5 string,c6 string,c7 string,c8 string,c9 string,c10 string,c11 string,c12 string,c13 string,c14 string,c15 string,c16 string,c17 string,c18 string,c19 string,c20 string,c21 string,c22 string,c23 string,c24 string,c25 string,c26 string
) ROW FORMAT 
DELIMITED FIELDS TERMINATED BY '\t' 
STORED AS TEXTFILE LOCATION 's3://myui-dev/emr/warehouse/criteo/train_raw/';

drop table test_raw;
create external table test_raw (
  rowid bigint,
  -- quantitative variables
  i1 int,i2 int,i3 int,i4 int,i5 int,i6 int,i7 int,i8 int,i9 int,i10 int,i11 int,i12 int,i13 int,
  -- categorical variables
  c1 string,c2 string,c3 string,c4 string,c5 string,c6 string,c7 string,c8 string,c9 string,c10 string,c11 string,c12 string,c13 string,c14 string,c15 string,c16 string,c17 string,c18 string,c19 string,c20 string,c21 string,c22 string,c23 string,c24 string,c25 string,c26 string
) ROW FORMAT 
DELIMITED FIELDS TERMINATED BY '\t' 
STORED AS TEXTFILE LOCATION 's3://myui-dev/emr/warehouse/criteo/test_raw/';
awk '{IFS="\t"; OFS="\t"} {print 10000000+NR-1,$0}' train.txt | \
hadoop fs -put - /dataset/criteo/train/train.txt

awk '{IFS="\t"; OFS="\t"} {print 60000000+NR-1,$0}' test.txt | \
hadoop fs -put - /dataset/criteo/test/test.txt

Feature Engineering

drop table train;
create external table train (
  rowid bigint,
  features array<string>,
  label int
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/train/' 
TBLPROPERTIES ("orc.compress"="SNAPPY"); 

INSERT OVERWRITE TABLE train 
select
  rowid, 
  add_field_indicies(array_concat(
    quantitative_features(
      array('i1','i2','i3','i4','i5','i6','i7','i8','i9','i10','i11','i12','i13'),
      ln(i1+1),
      ln(i2+4), -- min(i2) is -3
      ln(i3+1),
      ln(i4+1),
      ln(i5+1),
      ln(i6+1),
      ln(i7+1),
      ln(i8+1),
      ln(i9+1),
      ln(i10+1),
      ln(i11+1),
      ln(i12+1),
      ln(i13+1),
      "-emit_null"
    ),
    categorical_features(
      array('c1','c2','c3','c4','c5','c6','c7','c8','c9','c10','c11','c12','c13','c14','c15','c16','c17','c18','c19','c20','c21','c22','c23','c24','c25','c26'),
      c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,
      "-emit_null -force_value"
    )
  )) as features,
  label
from 
  train_raw
where
  label IS NOT NULL -- rowid 54203165
CLUSTER BY rand(1)
;

drop table test;
create external table test (
  rowid bigint,
  features array<string>
)
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/test/' 
TBLPROPERTIES ("orc.compress"="SNAPPY"); 

INSERT OVERWRITE TABLE test 
select
  rowid, 
  add_field_indicies(array_concat(
    quantitative_features(
      array('i1','i2','i3','i4','i5','i6','i7','i8','i9','i10','i11','i12','i13'),
      ln(i1+1),
      ln(i2+4), -- min(i2) is -3
      ln(i3+1),
      ln(i4+1),
      ln(i5+1),
      ln(i6+1),
      ln(i7+1),
      ln(i8+1),
      ln(i9+1),
      ln(i10+1),
      ln(i11+1),
      ln(i12+1),
      ln(i13+1),
      "-emit_null"
    ),
    categorical_features(
      array('c1','c2','c3','c4','c5','c6','c7','c8','c9','c10','c11','c12','c13','c14','c15','c16','c17','c18','c19','c20','c21','c22','c23','c24','c25','c26'),
      c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,
      "-emit_null -force_value"
    )
  )) as features
from 
  test_raw
;

Training

-- Hive on Tez
SET tez.task.resource.memory.mb=3072;
SET hive.tez.java.opts=-server -Xmx2560m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC -XX:+HeapDumpOnOutOfMemoryError;
-- Hive on MapReuce
SET mapreduce.map.memory.mb=3072;
SET mapreduce.map.java.opts=-server -Xmx2560m -XX:+PrintGCDetails -XX:+UseNUMA -XX:+UseParallelGC -XX:+HeapDumpOnOutOfMemoryError;

SET mapred.max.split.size=64000000; -- use more mappers to avoid OOM in mappers
drop table ffm_model;
create external table ffm_model (
  model_id string,
  i int,
  Wi float,
  Vi array<float>
) 
STORED AS ORC
LOCATION 's3://myui-dev/emr/warehouse/criteo/ffm_model/' 
TBLPROPERTIES ("orc.compress"="SNAPPY"); 

INSERT OVERWRITE TABLE ffm_model
select
  train_ffm(features, label, "-c -iters 10 -factors 4 -feature_hashing 20")
from
  train
;

Prediction

drop table criteo2.ffm_predicted;
create table criteo2.ffm_predicted
  ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY "\t"
    LINES TERMINATED BY "\n"
  STORED AS TEXTFILE
as
WITH testing_exploded as (
  select
    t1.rowid, 
    t2.i,
    t2.j,
    t2.Xi,
    t2.Xj
  from
    test t1
    LATERAL VIEW feature_pairs(features, "-ffm -feature_hashing 20") t2 as i, j, Xi, Xj
),
predicted as (
  select
    t1.rowid,
    p1.model_id,
    ffm_predict(
      p1.Wi,      
      p1.Vi, -- Vij
      p2.Vi, -- Vji
      t1.Xi,
      t1.Xj
    ) as predicted
  from 
    testing_exploded t1
    LEFT OUTER JOIN ffm_model p1 ON (p1.i = t1.i)
    LEFT OUTER JOIN ffm_model p2 ON (p2.i = t1.j and p2.model_id = p1.model_id)  
  group by
    t1.rowid,
    p1.model_id
),
ensembled as (
  select
    rowid,
    voted_avg(predicted) as score -- classification
    -- avg(predicted) as score       -- regression
  from
    predicted  
  group by
    rowid
)
select
  rowid,
  sigmoid(score) as predicted -- classification
  -- score as predicted -- regression
from
  ensembled
;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment