Created
November 30, 2022 23:41
-
-
Save rnyak/6bf39e8e762f032d095c97913c00fe1a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import glob | |
import numpy as np | |
import pandas as pd | |
import cudf | |
import cupy as cp | |
import nvtabular as nvt | |
from nvtabular.ops import * | |
from merlin.schema.tags import Tags | |
INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", "/workspace/data/") | |
NUM_ROWS = 100000 | |
long_tailed_item_distribution = np.clip(np.random.lognormal(3., 1., NUM_ROWS).astype(np.int32), 1, 50000) | |
# generate random item interaction features | |
df = pd.DataFrame(np.random.randint(70000, 90000, NUM_ROWS), columns=['session_id']) | |
df['item_id'] = long_tailed_item_distribution | |
# generate category mapping for each item-id | |
df['category'] = pd.cut(df['item_id'], bins=334, labels=np.arange(1, 335)).astype(np.int32) | |
df['age_days'] = np.random.uniform(0, 1, NUM_ROWS).astype(np.float32) | |
df['weekday_sin']= np.random.uniform(0, 1, NUM_ROWS).astype(np.float32) | |
# generate day mapping for each session | |
map_day = dict(zip(df.session_id.unique(), np.random.randint(1, 10, size=(df.session_id.nunique())))) | |
df['day'] = df.session_id.map(map_day) | |
SESSIONS_MAX_LENGTH =20 | |
# Categorify categorical features | |
categ_feats = ['session_id', 'item_id', 'category'] >> nvt.ops.Categorify(start_index=1) | |
# Define Groupby Workflow | |
groupby_feats = categ_feats + ['day', 'age_days', 'weekday_sin'] | |
# Group interaction features by session | |
groupby_features = groupby_feats >> nvt.ops.Groupby( | |
groupby_cols=["session_id"], | |
aggs={ | |
"item_id": ["list", "count"], | |
"category": ["list"], | |
"day": ["first"], | |
"age_days": ["list"], | |
'weekday_sin': ["list"], | |
}, | |
name_sep="-") | |
# Select and truncate the sequential features | |
sequence_features_truncated = ( | |
groupby_features['category-list'] | |
>> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH) | |
>> nvt.ops.ValueCount() | |
) | |
sequence_features_truncated_item = ( | |
groupby_features['item_id-list'] | |
>> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH) | |
>> TagAsItemID() | |
>> nvt.ops.ValueCount() | |
) | |
sequence_features_truncated_cont = ( | |
groupby_features['age_days-list', 'weekday_sin-list'] | |
>> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH) | |
>> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS]) | |
>> nvt.ops.ValueCount() | |
) | |
# Filter out sessions with length 1 (not valid for next-item prediction training and evaluation) | |
MINIMUM_SESSION_LENGTH = 2 | |
selected_features = ( | |
groupby_features['item_id-count', 'day-first', 'session_id'] + | |
sequence_features_truncated_item + | |
sequence_features_truncated + | |
sequence_features_truncated_cont | |
) | |
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH) | |
seq_feats_list = filtered_sessions['item_id-list', 'category-list', 'age_days-list', 'weekday_sin-list'] >> nvt.ops.ValueCount() | |
workflow = nvt.Workflow(filtered_sessions['session_id', 'day-first', 'item_id-count'] + seq_feats_list) | |
dataset = nvt.Dataset(df, cpu=False) | |
# Generate statistics for the features | |
workflow.fit(dataset) | |
# Apply the preprocessing and return an NVTabular dataset | |
sessions_ds = workflow.transform(dataset) | |
workflow.fit_transform(dataset).to_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt")) | |
workflow.save('workflow_etl') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment