Created
October 27, 2021 15:50
-
-
Save rnyak/f5f50abcb92c987fdbcec88d6a50cf84 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import numpy as np | |
import pandas as pd | |
import nvtabular as nvt | |
NUM_ROWS = 1000 | |
long_tailed_item_distribution = np.clip(np.random.lognormal(3., 1., NUM_ROWS).astype(np.int32), 1, 50000) | |
# generate random item interaction features | |
df = pd.DataFrame(np.random.randint(70000, 80000, NUM_ROWS), columns=['session_id']) | |
df['item_id'] = long_tailed_item_distribution | |
# generate category mapping for each item-id | |
df['category'] = pd.cut(df['item_id'], bins=334, labels=np.arange(1, 335)).astype(np.int32) | |
df['timestamp/age_days'] = np.random.uniform(0, 1, NUM_ROWS) | |
df['timestamp/weekday/sin']= np.random.uniform(0, 1, NUM_ROWS) | |
# generate day mapping for each session | |
map_day = dict(zip(df.session_id.unique(), np.random.randint(1, 10, size=(df.session_id.nunique())))) | |
df['day'] = df.session_id.map(map_day) | |
# Categorify categorical features | |
categ_feats = (['session_id', 'category', 'item_id'])>> nvt.ops.Categorify(start_index=1) | |
# Define Groupby Workflow | |
groupby_feats = categ_feats + ['day', 'timestamp/age_days', 'timestamp/weekday/sin'] | |
# Groups interaction features by session and sorted by timestamp | |
groupby_features = groupby_feats >> nvt.ops.Groupby( | |
groupby_cols=["session_id"], | |
aggs={ | |
"item_id": ["list", "count"], | |
"category": ["list"], | |
"day": ["first"], | |
"timestamp/age_days": ["list"], | |
'timestamp/weekday/sin': ["list"], | |
}, | |
name_sep="-") | |
# Select and truncate the sequential features | |
sequence_features_truncated = (groupby_features['category-list', 'item_id-list', | |
'timestamp/age_days-list', 'timestamp/weekday/sin-list']) >> \ | |
nvt.ops.ListSlice(0,20) >> nvt.ops.Rename(postfix = '_trim') | |
# Filter out sessions with length 1 (not valid for next-item prediction training and evaluation) | |
MINIMUM_SESSION_LENGTH = 2 | |
selected_features = groupby_features['item_id-count', 'day-first', 'session_id'] + sequence_features_truncated | |
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH) | |
val_count_feats = filtered_sessions['item_id-list_trim', 'category-list_trim', 'timestamp/age_days-list_trim', 'timestamp/weekday/sin-list_trim'] >> nvt.ops.ValueCount() | |
feats_item = val_count_feats['item_id-list_trim'] >> nvt.ops.AddMetadata(tags=["categorical" , "item_id"]) | |
feats_categ = val_count_feats['category-list_trim'] >> nvt.ops.AddMetadata(tags=["categorical"]) | |
feats_cont = val_count_feats['timestamp/age_days-list_trim', 'timestamp/weekday/sin-list_trim'] >> nvt.ops.AddMetadata(tags=["continuous"]) | |
workflow = nvt.Workflow(feats_item + feats_categ + feats_cont) | |
dataset = nvt.Dataset(df, cpu=False) | |
# Generating statistics for the features | |
workflow.fit(dataset) | |
sessions_gdf = workflow.transform(dataset).to_ddf().compute() | |
print(sessions_gdf.tail()) | |
workflow.transform(dataset).to_parquet( | |
'./tmp/', | |
out_files_per_proc=1, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment