Skip to content

Instantly share code, notes, and snippets.

@rnyak
Created November 30, 2022 23:41
Show Gist options
  • Save rnyak/6bf39e8e762f032d095c97913c00fe1a to your computer and use it in GitHub Desktop.
Save rnyak/6bf39e8e762f032d095c97913c00fe1a to your computer and use it in GitHub Desktop.
import os
import glob
import numpy as np
import pandas as pd
import cudf
import cupy as cp
import nvtabular as nvt
from nvtabular.ops import *
from merlin.schema.tags import Tags
INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", "/workspace/data/")
NUM_ROWS = 100000
long_tailed_item_distribution = np.clip(np.random.lognormal(3., 1., NUM_ROWS).astype(np.int32), 1, 50000)
# generate random item interaction features
df = pd.DataFrame(np.random.randint(70000, 90000, NUM_ROWS), columns=['session_id'])
df['item_id'] = long_tailed_item_distribution
# generate category mapping for each item-id
df['category'] = pd.cut(df['item_id'], bins=334, labels=np.arange(1, 335)).astype(np.int32)
df['age_days'] = np.random.uniform(0, 1, NUM_ROWS).astype(np.float32)
df['weekday_sin']= np.random.uniform(0, 1, NUM_ROWS).astype(np.float32)
# generate day mapping for each session
map_day = dict(zip(df.session_id.unique(), np.random.randint(1, 10, size=(df.session_id.nunique()))))
df['day'] = df.session_id.map(map_day)
SESSIONS_MAX_LENGTH =20
# Categorify categorical features
categ_feats = ['session_id', 'item_id', 'category'] >> nvt.ops.Categorify(start_index=1)
# Define Groupby Workflow
groupby_feats = categ_feats + ['day', 'age_days', 'weekday_sin']
# Group interaction features by session
groupby_features = groupby_feats >> nvt.ops.Groupby(
groupby_cols=["session_id"],
aggs={
"item_id": ["list", "count"],
"category": ["list"],
"day": ["first"],
"age_days": ["list"],
'weekday_sin': ["list"],
},
name_sep="-")
# Select and truncate the sequential features
sequence_features_truncated = (
groupby_features['category-list']
>> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH)
>> nvt.ops.ValueCount()
)
sequence_features_truncated_item = (
groupby_features['item_id-list']
>> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH)
>> TagAsItemID()
>> nvt.ops.ValueCount()
)
sequence_features_truncated_cont = (
groupby_features['age_days-list', 'weekday_sin-list']
>> nvt.ops.ListSlice(-SESSIONS_MAX_LENGTH)
>> nvt.ops.AddMetadata(tags=[Tags.CONTINUOUS])
>> nvt.ops.ValueCount()
)
# Filter out sessions with length 1 (not valid for next-item prediction training and evaluation)
MINIMUM_SESSION_LENGTH = 2
selected_features = (
groupby_features['item_id-count', 'day-first', 'session_id'] +
sequence_features_truncated_item +
sequence_features_truncated +
sequence_features_truncated_cont
)
filtered_sessions = selected_features >> nvt.ops.Filter(f=lambda df: df["item_id-count"] >= MINIMUM_SESSION_LENGTH)
seq_feats_list = filtered_sessions['item_id-list', 'category-list', 'age_days-list', 'weekday_sin-list'] >> nvt.ops.ValueCount()
workflow = nvt.Workflow(filtered_sessions['session_id', 'day-first', 'item_id-count'] + seq_feats_list)
dataset = nvt.Dataset(df, cpu=False)
# Generate statistics for the features
workflow.fit(dataset)
# Apply the preprocessing and return an NVTabular dataset
sessions_ds = workflow.transform(dataset)
workflow.fit_transform(dataset).to_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt"))
workflow.save('workflow_etl')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment