Created
December 1, 2022 16:54
-
-
Save rnyak/ff6a9a4033053ef2a46d46938df2f70b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import glob | |
import numpy as np | |
import pandas as pd | |
import cudf | |
import cupy as cp | |
import nvtabular as nvt | |
from nvtabular.ops import * | |
from merlin.schema.tags import Tags | |
INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", "/workspace/data/") | |
NUM_ROWS = 10000 | |
long_tailed_item_distribution = np.clip(np.random.lognormal(3., 1., NUM_ROWS).astype(np.int32), 1, 50000) | |
# generate random item interaction features | |
df = pd.DataFrame(np.random.randint(70000, 90000, NUM_ROWS), columns=['session_id']) | |
df['item_id'] = long_tailed_item_distribution | |
# generate category mapping for each item-id | |
df['category'] = pd.cut(df['item_id'], bins=334, labels=np.arange(1, 335)).astype(np.int32) | |
df['age_days'] = np.random.uniform(0, 1, NUM_ROWS).astype(np.float32) | |
df['weekday_sin']= np.random.uniform(0, 1, NUM_ROWS).astype(np.float32) | |
# generate day mapping for each session | |
map_day = dict(zip(df.session_id.unique(), np.random.randint(1, 10, size=(df.session_id.nunique())))) | |
df['day'] = df.session_id.map(map_day) | |
SESSIONS_MAX_LENGTH =10 | |
categ_feats = ['session_id', 'item_id', 'category'] >> nvt.ops.Categorify(start_index=1) | |
# Define Groupby Workflow | |
groupby_feats = categ_feats + ['day', 'age_days', 'weekday_sin'] | |
# Group interaction features by session | |
groupby_features = groupby_feats >> nvt.ops.Groupby( | |
groupby_cols=["session_id"], | |
aggs={ | |
"item_id": ["list", "count"], | |
"category": ["list"], | |
"day": ["first"], | |
"age_days": ["list"], | |
'weekday_sin': ["list"], | |
}, | |
name_sep="-") | |
# Select and truncate the sequential features | |
sequence_features_truncated = ( | |
groupby_features['category-list'] | |
>> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH) | |
>> nvt.ops.ValueCount() | |
) | |
sequence_features_truncated_item = ( | |
groupby_features['item_id-list'] | |
>> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH) | |
>> TagAsItemID() | |
>> nvt.ops.ValueCount() | |
) | |
sequence_features_truncated_cont = ( | |
groupby_features['age_days-list', 'weekday_sin-list'] | |
>> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH) | |
>> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS]) | |
>> nvt.ops.ValueCount() | |
) | |
# Filter out sessions with length 1 (not valid for next-item prediction training and evaluation) | |
MINIMUM_SESSION_LENGTH = 2 | |
selected_features = ( | |
groupby_features['item_id-count', 'day-first', 'session_id'] + | |
sequence_features_truncated_item + | |
sequence_features_truncated + | |
sequence_features_truncated_cont | |
) | |
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH) | |
seq_feats_list = filtered_sessions['item_id-list', 'category-list', 'age_days-list', 'weekday_sin-list'] >> nvt.ops.ValueCount() | |
workflow = nvt.Workflow(seq_feats_list) | |
dataset = nvt.Dataset(df, cpu=False) | |
# Generate statistics for the features | |
workflow.fit(dataset) | |
# Apply the preprocessing and return an NVTabular dataset | |
sessions_ds = workflow.transform(dataset) | |
workflow.save('workflow_etl') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment