Skip to content

Instantly share code, notes, and snippets.

@rnyak
Created October 27, 2021 15:50
Show Gist options
  • Save rnyak/f5f50abcb92c987fdbcec88d6a50cf84 to your computer and use it in GitHub Desktop.
Save rnyak/f5f50abcb92c987fdbcec88d6a50cf84 to your computer and use it in GitHub Desktop.
import os
import numpy as np
import pandas as pd
import nvtabular as nvt
NUM_ROWS = 1000
long_tailed_item_distribution = np.clip(np.random.lognormal(3., 1., NUM_ROWS).astype(np.int32), 1, 50000)
# generate random item interaction features
df = pd.DataFrame(np.random.randint(70000, 80000, NUM_ROWS), columns=['session_id'])
df['item_id'] = long_tailed_item_distribution
# generate category mapping for each item-id
df['category'] = pd.cut(df['item_id'], bins=334, labels=np.arange(1, 335)).astype(np.int32)
df['timestamp/age_days'] = np.random.uniform(0, 1, NUM_ROWS)
df['timestamp/weekday/sin']= np.random.uniform(0, 1, NUM_ROWS)
# generate day mapping for each session
map_day = dict(zip(df.session_id.unique(), np.random.randint(1, 10, size=(df.session_id.nunique()))))
df['day'] = df.session_id.map(map_day)
# Categorify categorical features
categ_feats = (['session_id', 'category', 'item_id'])>> nvt.ops.Categorify(start_index=1)
# Define Groupby Workflow
groupby_feats = categ_feats + ['day', 'timestamp/age_days', 'timestamp/weekday/sin']
# Groups interaction features by session and sorted by timestamp
groupby_features = groupby_feats >> nvt.ops.Groupby(
groupby_cols=["session_id"],
aggs={
"item_id": ["list", "count"],
"category": ["list"],
"day": ["first"],
"timestamp/age_days": ["list"],
'timestamp/weekday/sin': ["list"],
},
name_sep="-")
# Select and truncate the sequential features
sequence_features_truncated = (groupby_features['category-list', 'item_id-list',
'timestamp/age_days-list', 'timestamp/weekday/sin-list']) >> \
nvt.ops.ListSlice(0,20) >> nvt.ops.Rename(postfix = '_trim')
# Filter out sessions with length 1 (not valid for next-item prediction training and evaluation)
MINIMUM_SESSION_LENGTH = 2
selected_features = groupby_features['item_id-count', 'day-first', 'session_id'] + sequence_features_truncated
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH)
val_count_feats = filtered_sessions['item_id-list_trim', 'category-list_trim', 'timestamp/age_days-list_trim', 'timestamp/weekday/sin-list_trim'] >> nvt.ops.ValueCount()
feats_item = val_count_feats['item_id-list_trim'] >> nvt.ops.AddMetadata(tags=["categorical" , "item_id"])
feats_categ = val_count_feats['category-list_trim'] >> nvt.ops.AddMetadata(tags=["categorical"])
feats_cont = val_count_feats['timestamp/age_days-list_trim', 'timestamp/weekday/sin-list_trim'] >> nvt.ops.AddMetadata(tags=["continuous"])
workflow = nvt.Workflow(feats_item + feats_categ + feats_cont)
dataset = nvt.Dataset(df, cpu=False)
# Generating statistics for the features
workflow.fit(dataset)
sessions_gdf = workflow.transform(dataset).to_ddf().compute()
print(sessions_gdf.tail())
workflow.transform(dataset).to_parquet(
'./tmp/',
out_files_per_proc=1,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment