Skip to content

Instantly share code, notes, and snippets.

@mengdong
Created April 12, 2022 21:03
Show Gist options
  • Save mengdong/7c3693e1470ce382343938936508460d to your computer and use it in GitHub Desktop.
Save mengdong/7c3693e1470ce382343938936508460d to your computer and use it in GitHub Desktop.
HugeCTR Debugging
"""DeepFM Network trainer."""
import argparse
import json
import logging
import os
import sys
import time
import hugectr
from hugectr.inference import CreateInferenceSession
from hugectr.inference import InferenceParams
from model import create_model
import utils
SNAPSHOT_DIR = 'snapshots'
HYPERTUNE_METRIC_NAME = 'AUC'
LOCAL_MODEL_DIR = '/tmp/saved_model'
LOCAL_CHECKPOINT_DIR = '/tmp/checkpoints'
def set_job_dirs():
"""Sets job directories based on env variables set by Vertex AI."""
model_dir = os.getenv('AIP_MODEL_DIR', LOCAL_MODEL_DIR)
if model_dir[0:5] == 'gs://':
model_dir = model_dir.replace('gs://', '/gcs/')
checkpoint_dir = os.getenv('AIP_CHECKPOINT_DIR', LOCAL_CHECKPOINT_DIR)
if checkpoint_dir[0:5] == 'gs://':
checkpoint_dir = checkpoint_dir.replace('gs://', '/gcs/')
return model_dir, checkpoint_dir
def save_model(model, model_name, model_dir):
"""Saves model graph and model parameters."""
parameters_path = os.path.join(model_dir, model_name)
logging.info('Saving model parameters to: %s', parameters_path)
model.save_params_to_files(prefix=parameters_path)
graph_path = os.path.join(model_dir, f'{model_name}.json')
logging.info('Saving model graph to: %s', graph_path)
model.graph_to_json(graph_config_file=graph_path)
def evaluate_model(
model_name,
model_dir,
eval_data_source,
num_batches,
slot_size_array,
max_batchsize=2048,
hit_rate_threshold=0.6,
device_id=0,
use_gpu_embedding_cache=True,
cache_size_percentage=0.6,
i64_input_key=True):
"""Evaluates a model on a validation dataset."""
dense_model_file = os.path.join(model_dir,
f'{model_name}_dense_0.model')
sparse_model_files = [os.path.join(model_dir,
f'{model_name}0_sparse_0.model')]
inference_params = InferenceParams(
model_name=model_name,
max_batchsize=max_batchsize,
hit_rate_threshold=hit_rate_threshold,
dense_model_file=dense_model_file,
sparse_model_files=sparse_model_files,
device_id=device_id,
use_gpu_embedding_cache=use_gpu_embedding_cache,
cache_size_percentage=cache_size_percentage,
i64_input_key=i64_input_key)
model_config_path = os.path.join(model_dir, f'{model_name}.json')
inference_session = CreateInferenceSession(
model_config_path=model_config_path,
inference_params=inference_params)
eval_results = inference_session.evaluate(
num_batches=num_batches,
source=eval_data_source,
data_reader_type=hugectr.DataReaderType_t.Parquet,
check_type=hugectr.Check_t.Non,
slot_size_array=slot_size_array)
return eval_results
def main(args):
"""Runs a training loop."""
repeat_dataset = False if args.num_epochs > 0 else True
model_dir, snapshot_dir = set_job_dirs()
num_gpus = sum([len(gpus) for gpus in args.gpus])
batch_size = num_gpus * args.per_gpu_batch_size
model = create_model(train_data=[args.train_data],
valid_data=args.valid_data,
max_eval_batches=args.max_eval_batches,
dropout_rate=args.dropout_rate,
num_dense_features=args.num_dense_features,
num_sparse_features=args.num_sparse_features,
num_workers=args.num_workers,
slot_size_array=args.slot_size_array,
nnz_per_slot=args.nnz_per_slot,
batchsize=batch_size,
lr=args.lr,
gpus=args.gpus,
repeat_dataset=repeat_dataset)
model.summary()
logging.info('Starting model training')
model.fit(num_epochs=args.num_epochs,
max_iter=args.max_iter,
display=args.display_interval,
eval_interval=args.eval_interval,
snapshot=args.snapshot_interval,
snapshot_prefix=os.path.join(snapshot_dir, args.model_name))
logging.info('Saving model')
save_model(model, args.model_name, model_dir)
logging.info('Starting model evaluation using %s batches ...',
args.eval_batches)
metric_value = evaluate_model(model_name=args.model_name,
model_dir=model_dir,
eval_data_source=args.valid_data,
num_batches=args.eval_batches,
device_id=0,
max_batchsize=args.per_gpu_batch_size,
slot_size_array=args.slot_size_array)
logging.info('%s on the evaluation dataset: %s',
HYPERTUNE_METRIC_NAME, metric_value)
def parse_args():
"""Parses command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument('--model_name',
type=str,
required=False,
default='deepfm',
help='Model Name.')
parser.add_argument('-t',
'--train_data',
type=str,
default='/criteo/criteo_processed_parquet_1/_train_file_list.txt',
help='Path to training data _file_list.txt')
parser.add_argument('-v',
'--valid_data',
type=str,
default='/criteo/criteo_processed_parquet_1/_test_file_list.txt',
help='Path to validation data _file_list.txt')
parser.add_argument('--schema',
type=str,
default='/criteo/criteo_processed_parquet_1/schema.pbtxt',
help='Path to the schema.pbtxt file')
parser.add_argument('--slot_size_array',
type=str,
default='9999999 39061 17296 7425 20266 4 7123 1544 64 9999999 3067956 405283 11 2209 11939 155 4 977 15 9999999 9999999 9999999 590152 12974 109 37
',
help='List with cardinalities of categorical columns')
parser.add_argument('--dropout_rate',
type=float,
required=False,
default=0.5,
help='Dropout rate')
parser.add_argument('--num_dense_features',
type=int,
required=False,
default=13,
help='Number of dense features')
parser.add_argument('--num_sparse_features',
type=int,
required=False,
default=26,
help='Number of sparse features')
parser.add_argument('--nnz_per_slot',
type=int,
required=False,
default=2,
help='NNZ per slot')
parser.add_argument('--lr',
type=float,
required=False,
default=0.001,
help='Learning rate')
parser.add_argument('-i',
'--max_iter',
type=int,
required=False,
default=5000,
help='Number of training iterations')
parser.add_argument(
'--max_eval_batches',
type=int,
required=False,
default=300,
help='Max eval batches for evaluations during model.fit()')
parser.add_argument(
'--eval_batches',
type=int,
required=False,
default=100,
help='Number of evaluation batches for the final evaluation')
parser.add_argument('--num_epochs',
type=int,
required=False,
default=0,
help='Number of training epochs')
parser.add_argument('-b',
'--per_gpu_batch_size',
type=int,
required=False,
default=2048,
help='Per GPU Batch size')
parser.add_argument(
'-s',
'--snapshot_interval',
type=int,
required=False,
default=0,
help='Saves a model snapshot after given number of iterations')
parser.add_argument('--gpus',
type=str,
required=False,
default='[[0,1,2,3]]',
help='GPU devices to use for Preprocessing')
parser.add_argument('-r',
'--eval_interval',
type=int,
required=False,
default=1000,
help='Run evaluation after given number of iterations')
parser.add_argument('--display_interval',
type=int,
required=False,
default=200,
help='Display progress after given number of iterations')
parser.add_argument('--workspace_size_per_gpu',
type=int,
required=False,
default=300000,
help='Workspace size per gpu in MB')
parser.add_argument('--num_workers',
type=int,
required=False,
default=12,
help='Number of workers')
return parser.parse_args()
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s - %(message)s',
level=logging.INFO,
datefmt='%d-%m-%y %H:%M:%S',
stream=sys.stdout)
parsed_args = parse_args()
parsed_args.gpus = json.loads(parsed_args.gpus)
parsed_args.slot_size_array = [
int(i) for i in parsed_args.slot_size_array.split(sep=' ')
]
logging.info('Args: %s', parsed_args)
start_time = time.time()
logging.info('Starting training')
main(parsed_args)
end_time = time.time()
elapsed_time = end_time - start_time
logging.info('Training completed. Elapsed time: %s', elapsed_time )
from typing import List, Any
import hugectr
from mpi4py import MPI
def create_model(
train_data: List[Any],
valid_data: str,
slot_size_array: List[Any],
gpus: List[Any],
max_eval_batches: int = 300,
batchsize: int = 2048,
lr: float = 0.001,
dropout_rate: float = 0.5,
workspace_size_per_gpu: float = 5000,
num_dense_features: int = 13,
num_sparse_features: int = 26,
nnz_per_slot: int = 2,
num_workers: int = 12,
repeat_dataset: bool = True,
):
"""DeepFM Network(https://www.ijcai.org/Proceedings/2017/0239.pdf)."""
if not gpus:
gpus = [[0]]
solver = hugectr.CreateSolver(max_eval_batches=max_eval_batches,
batchsize_eval=batchsize,
batchsize=batchsize,
lr=lr,
vvgpu=gpus,
repeat_dataset=repeat_dataset,
i64_input_key=True)
reader = hugectr.DataReaderParams(
data_reader_type=hugectr.DataReaderType_t.Parquet,
source=train_data,
eval_source=valid_data,
# slot_size_array=slot_size_array,
check_type=hugectr.Check_t.Non,
num_workers=num_workers)
optimizer = hugectr.CreateOptimizer(optimizer_type=hugectr.Optimizer_t.Adam,
update_type=hugectr.Update_t.Global,
beta1=0.9,
beta2=0.999,
epsilon=0.0000001)
model = hugectr.Model(solver, reader, optimizer)
model.add(hugectr.Input(
label_dim=1, label_name="label",
dense_dim=num_dense_features, dense_name="dense",
data_reader_sparse_param_array=[hugectr.DataReaderSparseParam(
"data1", nnz_per_slot, False, num_sparse_features)]))
model.add(hugectr.SparseEmbedding(
embedding_type=hugectr.Embedding_t.LocalizedSlotSparseEmbeddingHash,
workspace_size_per_gpu_in_mb=workspace_size_per_gpu,
embedding_vec_size=11,
combiner="sum",
sparse_embedding_name="sparse_embedding1",
bottom_name="data1",
optimizer=optimizer))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Reshape,
bottom_names=["sparse_embedding1"],
top_names=["reshape1"],
leading_dim=11))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Slice,
bottom_names=["reshape1"],
top_names=["slice11", "slice12"],
ranges=[(0, 10), (10, 11)]))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Reshape,
bottom_names=["slice11"],
top_names=["reshape2"],
leading_dim=260))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Reshape,
bottom_names=["slice12"],
top_names=["reshape3"],
leading_dim=26))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Slice,
bottom_names=["dense"],
top_names=["slice21", "slice22"],
ranges=[(0, 13), (0, 13)]))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.WeightMultiply,
bottom_names=["slice21"],
top_names=["weight_multiply1"],
weight_dims=[13, 10]))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.WeightMultiply,
bottom_names=["slice22"],
top_names=["weight_multiply2"],
weight_dims=[13, 1]))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Concat,
bottom_names=["reshape2", "weight_multiply1"],
top_names=["concat1"]))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Slice,
bottom_names=["concat1"],
top_names=["slice31", "slice32"],
ranges=[(0, 390), (0, 390)]))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.InnerProduct,
bottom_names=["slice31"],
top_names=["fc1"],
num_output=400))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.ReLU,
bottom_names=["fc1"],
top_names=["relu1"]))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Dropout,
bottom_names=["relu1"],
top_names=["dropout1"],
dropout_rate=dropout_rate))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.InnerProduct,
bottom_names=["dropout1"],
top_names=["fc2"],
num_output=400))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.ReLU,
bottom_names=["fc2"],
top_names=["relu2"]))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Dropout,
bottom_names=["relu2"],
top_names=["dropout2"],
dropout_rate=dropout_rate))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.InnerProduct,
bottom_names=["dropout2"],
top_names=["fc3"],
num_output=400))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.ReLU,
bottom_names=["fc3"],
top_names=["relu3"]))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Dropout,
bottom_names=["relu3"],
top_names=["dropout3"],
dropout_rate=dropout_rate))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.InnerProduct,
bottom_names=["dropout3"],
top_names=["fc4"],
num_output=1))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.FmOrder2,
bottom_names=["slice32"],
top_names=["fmorder2"],
out_dim=10))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.ReduceSum,
bottom_names=["fmorder2"],
top_names=["reducesum1"],
axis=1))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Concat,
bottom_names=["reshape3", "weight_multiply2"],
top_names=["concat2"]))
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.ReduceSum,
bottom_names=["concat2"],
top_names=["reducesum2"],
axis=1))
model.add(hugectr.DenseLayer(
layer_type=hugectr.Layer_t.Add,
bottom_names=["fc4", "reducesum1", "reducesum2"],
top_names=["add"]))
model.add(hugectr.DenseLayer(
layer_type=hugectr.Layer_t.BinaryCrossEntropyLoss,
bottom_names=["add", "label"],
top_names=["loss"]))
model.compile()
return model
import os
import time
import numpy as np
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, performance_report
import nvtabular as nvt
from nvtabular.utils import device_mem_size
from nvtabular.ops import Categorify, FillMissing, Clip, Normalize
def create_criteo_nvt_workflow(client):
"""Create a nvt.Workflow definition with transformation all the steps."""
# Columns definition
cont_names = ['I' + str(x) for x in range(1, 14)]
cat_names = ['C' + str(x) for x in range(1, 27)]
# Transformation pipeline
num_buckets = 10000000
categorify_op = Categorify(max_size=num_buckets)
cat_features = cat_names >> categorify_op
cont_features = cont_names >> FillMissing() >> Clip(
min_value=0) >> Normalize()
features = cat_features + cont_features + ['label']
embeddings_dict_cat = categorify_op.get_embedding_sizes(cat_names)
embeddings = [embeddings_dict_cat[c][0] for c in cat_names]
print("first time print embedding")
print(embeddings)
# Create and save workflow
return nvt.Workflow(features, client)
def analyze_dataset(
workflow,
dataset,
):
"""Calculate statistics for a given workflow."""
workflow.fit(dataset)
return workflow
def transform_dataset(
dataset,
workflow
):
"""Apply the transformations to the dataset."""
workflow.transform(dataset)
return dataset
def load_workflow(
workflow_path,
client,
):
"""Load a workflow definition from a path."""
return nvt.Workflow.load(workflow_path, client)
def main():
num_output_files_train = 32 # Number of output files after converting CSV to Parquet
num_output_files_valid = 8 # Number of output files after converting CSV to Parquet
# data_path = '/criteo/criteo_raw_parquet_train'
data_path = '/criteo/criteo_raw_parquet'
# output_path = '/criteo/criteo_processed_parquet_train'
output_path = '/criteo/criteo_processed_parquet_1'
path = '/scripts/workspace_2022/merlin/workflow_1'
n_workers = 4
frac_size = 0.10
num_buckets = 10_000_000
memory_limit = 100_000_000_000
device_size = device_mem_size()
device_limit_frac, device_pool_frac = 0.60, 0.90
device_limit = int(device_limit_frac * device_size)
device_pool_size = int(device_pool_frac * device_size)
rmm_pool_size = (device_pool_size // 256) * 256
# Spin up local cluster
cluster = LocalCUDACluster(
n_workers=n_workers,
memory_limit=memory_limit,
device_memory_limit=device_limit,
rmm_pool_size=rmm_pool_size,
)
client = Client(cluster)
print(client)
# Define dataset
dataset = nvt.Dataset(
data_path,
engine="parquet",
part_mem_fraction=frac_size,
client=client,
)
# Create Workflow
cont_names = ['I' + str(x) for x in range(1, 14)]
cat_names = ['C' + str(x) for x in range(1, 27)]
# Transformation pipeline
num_buckets = 10000000
categorify_op = Categorify(max_size=num_buckets)
cat_features = cat_names >> categorify_op
cont_features = cont_names >> FillMissing() >> Clip(
min_value=0) >> Normalize()
features = cat_features + cont_features + ['label']
criteo_workflow = nvt.Workflow(features, client)
criteo_workflow = criteo_workflow.fit(dataset)
criteo_workflow.save(path)
###Getting slot size###
#--------------------##
embeddings_dict_cat = categorify_op.get_embedding_sizes(cat_names)
embeddings = [embeddings_dict_cat[c][0] for c in cat_names]
print(embeddings)
##--------------------##
CATEGORICAL_COLUMNS = ["C" + str(x) for x in range(1, 27)]
CONTINUOUS_COLUMNS = ["I" + str(x) for x in range(1, 14)]
LABEL_COLUMNS = ["label"]
dict_dtypes = {}
for col in CATEGORICAL_COLUMNS:
dict_dtypes[col] = np.int64
for col in CONTINUOUS_COLUMNS:
dict_dtypes[col] = np.float32
for col in LABEL_COLUMNS:
dict_dtypes[col] = np.float32
dataset.to_parquet(
output_path=output_path,
shuffle=None,
output_files=num_output_files_train,
# output_files=num_output_files_valid,
dtypes=dict_dtypes,
cats=CATEGORICAL_COLUMNS,
conts=CONTINUOUS_COLUMNS,
labels=LABEL_COLUMNS
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment