Skip to content

Instantly share code, notes, and snippets.

@rnyak
Last active September 6, 2021 23:57
Show Gist options
  • Save rnyak/e9b6d9d96718181aff1c9d47dca6aefc to your computer and use it in GitHub Desktop.
Save rnyak/e9b6d9d96718181aff1c9d47dca6aefc to your computer and use it in GitHub Desktop.
import os
import numpy as np
import pandas as pd
import glob
import shutil
import cudf
import cupy
import nvtabular as nvt
from nvtabular import ColumnSelector
NUM_ROWS = 10000
session_length = 20
inputs = {
'user_session': numpy.random.randint(1, 10000, NUM_ROWS),
'product_id': numpy.random.randint(1, 51996, NUM_ROWS),
'category_id': numpy.random.randint(0, 332, NUM_ROWS),
'event_time_ts': numpy.random.randint(1570373000, 1670373390, NUM_ROWS),
'prod_first_event_time_ts' : numpy.random.randint(1570373000, 1570373382, NUM_ROWS),
'price' : numpy.random.uniform(0, 2750, NUM_ROWS)
}
df = cudf.DataFrame(inputs)
# categorify features
#cat_feats = ['user_session', 'product_id', 'category_id'] >> nvt.ops.Categorify()
# create time features
sessionTs = ['event_time_ts']
sessionTime = (
sessionTs >>
nvt.ops.LambdaOp(lambda col: cudf.to_datetime(col, unit='s')) >>
nvt.ops.Rename(name = 'event_time_dt')
)
sessionTime_weekday = (
sessionTime >>
nvt.ops.LambdaOp(lambda col: col.dt.weekday) >>
nvt.ops.Rename(name ='et_dayofweek')
)
def get_cycled_feature_value_sin(col, max_value):
value_scaled = (col + 0.000001) / max_value
value_sin = np.sin(2*np.pi*value_scaled)
return value_sin
def get_cycled_feature_value_cos(col, max_value):
value_scaled = (col + 0.000001) / max_value
value_cos = np.cos(2*np.pi*value_scaled)
return value_cos
weekday_sin = sessionTime_weekday >> (lambda col: get_cycled_feature_value_sin(col+1, 7)) >> nvt.ops.Rename(name = 'et_dayofweek_sin')
weekday_cos= sessionTime_weekday >> (lambda col: get_cycled_feature_value_cos(col+1, 7)) >> nvt.ops.Rename(name = 'et_dayofweek_cos')
from nvtabular.ops import Operator
# custom op for item recency
class ItemRecency(Operator):
def transform(self, columns, gdf):
for column in columns:
col = gdf[column]
item_first_timestamp = gdf['prod_first_event_time_ts']
delta_days = (col - item_first_timestamp) / (60*60*24)
gdf[column + "_age_days"] = delta_days * (delta_days >=0)
return gdf
def output_column_names(self, columns):
return ColumnSelector([column + "_age_days" for column in columns])
def dependencies(self):
return ["prod_first_event_time_ts"]
recency_features = ['event_time_ts'] >> ItemRecency()
recency_features_norm = recency_features >> nvt.ops.LogOp() >> nvt.ops.Normalize() >> nvt.ops.Rename(name='product_recency_days_log_norm')
time_features = (
sessionTime +
sessionTime_weekday +
weekday_sin +
weekday_cos +
recency_features_norm
)
# Smoothing price long-tailed distribution
price_log = ['price'] >> nvt.ops.LogOp() >> nvt.ops.Normalize() >> nvt.ops.Rename(name='price_log_norm')
# Relative Price to the average price for the category_id
def relative_price_to_avg_categ(col, gdf):
epsilon = 1e-5
col = ((gdf['price'] - col) / (col + epsilon)) * (col > 0).astype(int)
return col
avg_category_id_pr = ['category_id'] >> nvt.ops.JoinGroupby(cont_cols =['price'], stats=["mean"]) >> nvt.ops.Rename(name='avg_category_id_price')
relative_price_to_avg_category = avg_category_id_pr >> nvt.ops.LambdaOp(relative_price_to_avg_categ, dependency=['price']) >> nvt.ops.Rename(name="relative_price_to_avg_categ_id")
groupby_feats = ['event_time_ts', 'user_session', 'product_id', 'category_id'] + time_features + price_log + relative_price_to_avg_category
# Define Groupby Workflow
groupby_features = groupby_feats >> nvt.ops.Groupby(
groupby_cols=["user_session"],
sort_cols=["event_time_ts"],
aggs={
'product_id': ["list", "count"],
'category_id': ["list"],
'event_time_ts': ["first"],
'event_time_dt': ["first"],
'et_dayofweek_sin': ["list"],
'et_dayofweek_cos': ["list"],
'price_log_norm': ["list"],
'relative_price_to_avg_categ_id': ["list"],
'product_recency_days_log_norm': ["list"]
},
name_sep="-")
groupby_features_nonlist = [x for x in groupby_features.output_columns.names if '-list' not in x]
SESSIONS_MAX_LENGTH = 20
MINIMUM_SESSION_LENGTH = 2
groupby_features_trim = ((groupby_features - groupby_features_nonlist)) >> nvt.ops.ListSlice(0,SESSIONS_MAX_LENGTH) >> nvt.ops.Rename(postfix = '_seq')
# calculate session day index based on 'timestamp-first' column
day_index = ((groupby_features['event_time_dt-first']) >>
nvt.ops.LambdaOp(lambda col: (col - col.min()).dt.days +1) >>
nvt.ops.Rename(f = lambda col: "day_index")
)
selected_features = groupby_features[groupby_features_nonlist] + groupby_features_trim
filtered_sessions = (selected_features- ['event_time_dt-first']) >> nvt.ops.Filter(f=lambda df: df["product_id-count"] >= MINIMUM_SESSION_LENGTH)
dataset = nvt.Dataset(df)
workflow = nvt.Workflow(filtered_sessions)
workflow.fit(dataset)
sessions_gdf = workflow.transform(dataset).to_ddf()
from nvtabular.inference.triton import generate_nvtabular_model
generate_nvtabular_model(
workflow=workflow,
name='model_nvt',
output_path='/workspace/TF4Rec/models/model_nvt/',
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment