Skip to content

Instantly share code, notes, and snippets.

View myui's full-sized avatar

Makoto YUI myui

View GitHub Profile
run: +main
_export:
td:
apikey: ${TD_API_KEY}
database: criteo
engine: hive
+main:
+prepare:

Item-base collabolative filtering

1. Prepare transaction table

Prepare following transaction table. We are generating feature_vector for each item_id based on cooccurrence of purchased items, a sort of bucket analysis.

userid item_id purchase_at timestamp
1 31231 2015-04-9 00:29:02
1 13212 2016-05-24 16:29:02
/*
* Hivemall: Hive scalable Machine Learning Library
*
* Copyright (C) 2015 Makoto YUI
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
@myui
myui / gnuplot.plt
Created September 2, 2016 02:24
gnuplot
set ytics nomirror
unset y2tics
set y2r[-200:300]
set yr[0:60]
set datafile separator " "
plot \
"hivemall_twitter2d.dat" using 1:2 with lines title "x" axes x1y2, \
"hivemall_twitter2d.dat" using 1:3 with lines title "outlier" axes x1y1
set ytics nomirror
set y2tics
set yr[-20:20]
plot \
"cf2d.dat" using 1:2 with lines title "x", \
"cf2d.dat" using 1:3 with lines title "outlier" axes x1y2, \
"cf2d.dat" using 1:4 with lines title "change" axes x1y2
a = load 'a9a.train'
as (rowid:int, label:float, features:{(featurepair:chararray)});
b = foreach c generate flatten(
logress(features, label, '-total_steps ${total_steps}')
) as (feature, weight);
c = group b by feature;
d = foreach c generate group, AVG(c.weight);
store d into 'a9a_model1';
val trainDf =
spark.read.format("libsvm”).load(“a9a.train")
trainDf.train_logregr($"feature", $"label")
.groupby("feature")
.agg("weight"->"avg")
context = HiveContext(sc)
context.sql("
SELECT
feature, avg(weight) as weight
FROM (
SELECT train_logregr(features, label)
as (feature, weight)
FROM trainTable
) model
val testData =
ssc.textFileStream(...).map(LabeledPoint.parse) // Infinite stream
testData.predict { case testDf =>
// Explode features in input streams
val testDf_exploded = ...
testDf_exploded
.join(model, testDf_exploded("feature") === model("feature"), "LEFT_OUTER")
.select($"rowid", ($"weight" * $"value").as("value"))
.groupby("rowid").sum("value")
@myui
myui / lr.sql
Created October 29, 2016 01:41
CREATE TABLE lr_model AS
SELECT
feature, -- reducers perform model averaging in parallel
avg(weight) as weight
FROM (
SELECT logress(features,label,..) as (feature,weight)
FROM train
) t -- map-only task
GROUP BY feature; -- shuffled to reducers