Created
April 12, 2022 21:03
-
-
Save mengdong/7c3693e1470ce382343938936508460d to your computer and use it in GitHub Desktop.
HugeCTR Debugging
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""DeepFM Network trainer.""" | |
import argparse | |
import json | |
import logging | |
import os | |
import sys | |
import time | |
import hugectr | |
from hugectr.inference import CreateInferenceSession | |
from hugectr.inference import InferenceParams | |
from model import create_model | |
import utils | |
SNAPSHOT_DIR = 'snapshots' | |
HYPERTUNE_METRIC_NAME = 'AUC' | |
LOCAL_MODEL_DIR = '/tmp/saved_model' | |
LOCAL_CHECKPOINT_DIR = '/tmp/checkpoints' | |
def set_job_dirs(): | |
"""Sets job directories based on env variables set by Vertex AI.""" | |
model_dir = os.getenv('AIP_MODEL_DIR', LOCAL_MODEL_DIR) | |
if model_dir[0:5] == 'gs://': | |
model_dir = model_dir.replace('gs://', '/gcs/') | |
checkpoint_dir = os.getenv('AIP_CHECKPOINT_DIR', LOCAL_CHECKPOINT_DIR) | |
if checkpoint_dir[0:5] == 'gs://': | |
checkpoint_dir = checkpoint_dir.replace('gs://', '/gcs/') | |
return model_dir, checkpoint_dir | |
def save_model(model, model_name, model_dir): | |
"""Saves model graph and model parameters.""" | |
parameters_path = os.path.join(model_dir, model_name) | |
logging.info('Saving model parameters to: %s', parameters_path) | |
model.save_params_to_files(prefix=parameters_path) | |
graph_path = os.path.join(model_dir, f'{model_name}.json') | |
logging.info('Saving model graph to: %s', graph_path) | |
model.graph_to_json(graph_config_file=graph_path) | |
def evaluate_model( | |
model_name, | |
model_dir, | |
eval_data_source, | |
num_batches, | |
slot_size_array, | |
max_batchsize=2048, | |
hit_rate_threshold=0.6, | |
device_id=0, | |
use_gpu_embedding_cache=True, | |
cache_size_percentage=0.6, | |
i64_input_key=True): | |
"""Evaluates a model on a validation dataset.""" | |
dense_model_file = os.path.join(model_dir, | |
f'{model_name}_dense_0.model') | |
sparse_model_files = [os.path.join(model_dir, | |
f'{model_name}0_sparse_0.model')] | |
inference_params = InferenceParams( | |
model_name=model_name, | |
max_batchsize=max_batchsize, | |
hit_rate_threshold=hit_rate_threshold, | |
dense_model_file=dense_model_file, | |
sparse_model_files=sparse_model_files, | |
device_id=device_id, | |
use_gpu_embedding_cache=use_gpu_embedding_cache, | |
cache_size_percentage=cache_size_percentage, | |
i64_input_key=i64_input_key) | |
model_config_path = os.path.join(model_dir, f'{model_name}.json') | |
inference_session = CreateInferenceSession( | |
model_config_path=model_config_path, | |
inference_params=inference_params) | |
eval_results = inference_session.evaluate( | |
num_batches=num_batches, | |
source=eval_data_source, | |
data_reader_type=hugectr.DataReaderType_t.Parquet, | |
check_type=hugectr.Check_t.Non, | |
slot_size_array=slot_size_array) | |
return eval_results | |
def main(args): | |
"""Runs a training loop.""" | |
repeat_dataset = False if args.num_epochs > 0 else True | |
model_dir, snapshot_dir = set_job_dirs() | |
num_gpus = sum([len(gpus) for gpus in args.gpus]) | |
batch_size = num_gpus * args.per_gpu_batch_size | |
model = create_model(train_data=[args.train_data], | |
valid_data=args.valid_data, | |
max_eval_batches=args.max_eval_batches, | |
dropout_rate=args.dropout_rate, | |
num_dense_features=args.num_dense_features, | |
num_sparse_features=args.num_sparse_features, | |
num_workers=args.num_workers, | |
slot_size_array=args.slot_size_array, | |
nnz_per_slot=args.nnz_per_slot, | |
batchsize=batch_size, | |
lr=args.lr, | |
gpus=args.gpus, | |
repeat_dataset=repeat_dataset) | |
model.summary() | |
logging.info('Starting model training') | |
model.fit(num_epochs=args.num_epochs, | |
max_iter=args.max_iter, | |
display=args.display_interval, | |
eval_interval=args.eval_interval, | |
snapshot=args.snapshot_interval, | |
snapshot_prefix=os.path.join(snapshot_dir, args.model_name)) | |
logging.info('Saving model') | |
save_model(model, args.model_name, model_dir) | |
logging.info('Starting model evaluation using %s batches ...', | |
args.eval_batches) | |
metric_value = evaluate_model(model_name=args.model_name, | |
model_dir=model_dir, | |
eval_data_source=args.valid_data, | |
num_batches=args.eval_batches, | |
device_id=0, | |
max_batchsize=args.per_gpu_batch_size, | |
slot_size_array=args.slot_size_array) | |
logging.info('%s on the evaluation dataset: %s', | |
HYPERTUNE_METRIC_NAME, metric_value) | |
def parse_args(): | |
"""Parses command line arguments.""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--model_name', | |
type=str, | |
required=False, | |
default='deepfm', | |
help='Model Name.') | |
parser.add_argument('-t', | |
'--train_data', | |
type=str, | |
default='/criteo/criteo_processed_parquet_1/_train_file_list.txt', | |
help='Path to training data _file_list.txt') | |
parser.add_argument('-v', | |
'--valid_data', | |
type=str, | |
default='/criteo/criteo_processed_parquet_1/_test_file_list.txt', | |
help='Path to validation data _file_list.txt') | |
parser.add_argument('--schema', | |
type=str, | |
default='/criteo/criteo_processed_parquet_1/schema.pbtxt', | |
help='Path to the schema.pbtxt file') | |
parser.add_argument('--slot_size_array', | |
type=str, | |
default='9999999 39061 17296 7425 20266 4 7123 1544 64 9999999 3067956 405283 11 2209 11939 155 4 977 15 9999999 9999999 9999999 590152 12974 109 37 | |
', | |
help='List with cardinalities of categorical columns') | |
parser.add_argument('--dropout_rate', | |
type=float, | |
required=False, | |
default=0.5, | |
help='Dropout rate') | |
parser.add_argument('--num_dense_features', | |
type=int, | |
required=False, | |
default=13, | |
help='Number of dense features') | |
parser.add_argument('--num_sparse_features', | |
type=int, | |
required=False, | |
default=26, | |
help='Number of sparse features') | |
parser.add_argument('--nnz_per_slot', | |
type=int, | |
required=False, | |
default=2, | |
help='NNZ per slot') | |
parser.add_argument('--lr', | |
type=float, | |
required=False, | |
default=0.001, | |
help='Learning rate') | |
parser.add_argument('-i', | |
'--max_iter', | |
type=int, | |
required=False, | |
default=5000, | |
help='Number of training iterations') | |
parser.add_argument( | |
'--max_eval_batches', | |
type=int, | |
required=False, | |
default=300, | |
help='Max eval batches for evaluations during model.fit()') | |
parser.add_argument( | |
'--eval_batches', | |
type=int, | |
required=False, | |
default=100, | |
help='Number of evaluation batches for the final evaluation') | |
parser.add_argument('--num_epochs', | |
type=int, | |
required=False, | |
default=0, | |
help='Number of training epochs') | |
parser.add_argument('-b', | |
'--per_gpu_batch_size', | |
type=int, | |
required=False, | |
default=2048, | |
help='Per GPU Batch size') | |
parser.add_argument( | |
'-s', | |
'--snapshot_interval', | |
type=int, | |
required=False, | |
default=0, | |
help='Saves a model snapshot after given number of iterations') | |
parser.add_argument('--gpus', | |
type=str, | |
required=False, | |
default='[[0,1,2,3]]', | |
help='GPU devices to use for Preprocessing') | |
parser.add_argument('-r', | |
'--eval_interval', | |
type=int, | |
required=False, | |
default=1000, | |
help='Run evaluation after given number of iterations') | |
parser.add_argument('--display_interval', | |
type=int, | |
required=False, | |
default=200, | |
help='Display progress after given number of iterations') | |
parser.add_argument('--workspace_size_per_gpu', | |
type=int, | |
required=False, | |
default=300000, | |
help='Workspace size per gpu in MB') | |
parser.add_argument('--num_workers', | |
type=int, | |
required=False, | |
default=12, | |
help='Number of workers') | |
return parser.parse_args() | |
if __name__ == '__main__': | |
logging.basicConfig(format='%(asctime)s - %(message)s', | |
level=logging.INFO, | |
datefmt='%d-%m-%y %H:%M:%S', | |
stream=sys.stdout) | |
parsed_args = parse_args() | |
parsed_args.gpus = json.loads(parsed_args.gpus) | |
parsed_args.slot_size_array = [ | |
int(i) for i in parsed_args.slot_size_array.split(sep=' ') | |
] | |
logging.info('Args: %s', parsed_args) | |
start_time = time.time() | |
logging.info('Starting training') | |
main(parsed_args) | |
end_time = time.time() | |
elapsed_time = end_time - start_time | |
logging.info('Training completed. Elapsed time: %s', elapsed_time ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List, Any | |
import hugectr | |
from mpi4py import MPI | |
def create_model( | |
train_data: List[Any], | |
valid_data: str, | |
slot_size_array: List[Any], | |
gpus: List[Any], | |
max_eval_batches: int = 300, | |
batchsize: int = 2048, | |
lr: float = 0.001, | |
dropout_rate: float = 0.5, | |
workspace_size_per_gpu: float = 5000, | |
num_dense_features: int = 13, | |
num_sparse_features: int = 26, | |
nnz_per_slot: int = 2, | |
num_workers: int = 12, | |
repeat_dataset: bool = True, | |
): | |
"""DeepFM Network(https://www.ijcai.org/Proceedings/2017/0239.pdf).""" | |
if not gpus: | |
gpus = [[0]] | |
solver = hugectr.CreateSolver(max_eval_batches=max_eval_batches, | |
batchsize_eval=batchsize, | |
batchsize=batchsize, | |
lr=lr, | |
vvgpu=gpus, | |
repeat_dataset=repeat_dataset, | |
i64_input_key=True) | |
reader = hugectr.DataReaderParams( | |
data_reader_type=hugectr.DataReaderType_t.Parquet, | |
source=train_data, | |
eval_source=valid_data, | |
# slot_size_array=slot_size_array, | |
check_type=hugectr.Check_t.Non, | |
num_workers=num_workers) | |
optimizer = hugectr.CreateOptimizer(optimizer_type=hugectr.Optimizer_t.Adam, | |
update_type=hugectr.Update_t.Global, | |
beta1=0.9, | |
beta2=0.999, | |
epsilon=0.0000001) | |
model = hugectr.Model(solver, reader, optimizer) | |
model.add(hugectr.Input( | |
label_dim=1, label_name="label", | |
dense_dim=num_dense_features, dense_name="dense", | |
data_reader_sparse_param_array=[hugectr.DataReaderSparseParam( | |
"data1", nnz_per_slot, False, num_sparse_features)])) | |
model.add(hugectr.SparseEmbedding( | |
embedding_type=hugectr.Embedding_t.LocalizedSlotSparseEmbeddingHash, | |
workspace_size_per_gpu_in_mb=workspace_size_per_gpu, | |
embedding_vec_size=11, | |
combiner="sum", | |
sparse_embedding_name="sparse_embedding1", | |
bottom_name="data1", | |
optimizer=optimizer)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Reshape, | |
bottom_names=["sparse_embedding1"], | |
top_names=["reshape1"], | |
leading_dim=11)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Slice, | |
bottom_names=["reshape1"], | |
top_names=["slice11", "slice12"], | |
ranges=[(0, 10), (10, 11)])) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Reshape, | |
bottom_names=["slice11"], | |
top_names=["reshape2"], | |
leading_dim=260)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Reshape, | |
bottom_names=["slice12"], | |
top_names=["reshape3"], | |
leading_dim=26)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Slice, | |
bottom_names=["dense"], | |
top_names=["slice21", "slice22"], | |
ranges=[(0, 13), (0, 13)])) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.WeightMultiply, | |
bottom_names=["slice21"], | |
top_names=["weight_multiply1"], | |
weight_dims=[13, 10])) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.WeightMultiply, | |
bottom_names=["slice22"], | |
top_names=["weight_multiply2"], | |
weight_dims=[13, 1])) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Concat, | |
bottom_names=["reshape2", "weight_multiply1"], | |
top_names=["concat1"])) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Slice, | |
bottom_names=["concat1"], | |
top_names=["slice31", "slice32"], | |
ranges=[(0, 390), (0, 390)])) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.InnerProduct, | |
bottom_names=["slice31"], | |
top_names=["fc1"], | |
num_output=400)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.ReLU, | |
bottom_names=["fc1"], | |
top_names=["relu1"])) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Dropout, | |
bottom_names=["relu1"], | |
top_names=["dropout1"], | |
dropout_rate=dropout_rate)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.InnerProduct, | |
bottom_names=["dropout1"], | |
top_names=["fc2"], | |
num_output=400)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.ReLU, | |
bottom_names=["fc2"], | |
top_names=["relu2"])) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Dropout, | |
bottom_names=["relu2"], | |
top_names=["dropout2"], | |
dropout_rate=dropout_rate)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.InnerProduct, | |
bottom_names=["dropout2"], | |
top_names=["fc3"], | |
num_output=400)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.ReLU, | |
bottom_names=["fc3"], | |
top_names=["relu3"])) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Dropout, | |
bottom_names=["relu3"], | |
top_names=["dropout3"], | |
dropout_rate=dropout_rate)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.InnerProduct, | |
bottom_names=["dropout3"], | |
top_names=["fc4"], | |
num_output=1)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.FmOrder2, | |
bottom_names=["slice32"], | |
top_names=["fmorder2"], | |
out_dim=10)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.ReduceSum, | |
bottom_names=["fmorder2"], | |
top_names=["reducesum1"], | |
axis=1)) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.Concat, | |
bottom_names=["reshape3", "weight_multiply2"], | |
top_names=["concat2"])) | |
model.add(hugectr.DenseLayer(layer_type=hugectr.Layer_t.ReduceSum, | |
bottom_names=["concat2"], | |
top_names=["reducesum2"], | |
axis=1)) | |
model.add(hugectr.DenseLayer( | |
layer_type=hugectr.Layer_t.Add, | |
bottom_names=["fc4", "reducesum1", "reducesum2"], | |
top_names=["add"])) | |
model.add(hugectr.DenseLayer( | |
layer_type=hugectr.Layer_t.BinaryCrossEntropyLoss, | |
bottom_names=["add", "label"], | |
top_names=["loss"])) | |
model.compile() | |
return model |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import numpy as np | |
from dask_cuda import LocalCUDACluster | |
from dask.distributed import Client, performance_report | |
import nvtabular as nvt | |
from nvtabular.utils import device_mem_size | |
from nvtabular.ops import Categorify, FillMissing, Clip, Normalize | |
def create_criteo_nvt_workflow(client): | |
"""Create a nvt.Workflow definition with transformation all the steps.""" | |
# Columns definition | |
cont_names = ['I' + str(x) for x in range(1, 14)] | |
cat_names = ['C' + str(x) for x in range(1, 27)] | |
# Transformation pipeline | |
num_buckets = 10000000 | |
categorify_op = Categorify(max_size=num_buckets) | |
cat_features = cat_names >> categorify_op | |
cont_features = cont_names >> FillMissing() >> Clip( | |
min_value=0) >> Normalize() | |
features = cat_features + cont_features + ['label'] | |
embeddings_dict_cat = categorify_op.get_embedding_sizes(cat_names) | |
embeddings = [embeddings_dict_cat[c][0] for c in cat_names] | |
print("first time print embedding") | |
print(embeddings) | |
# Create and save workflow | |
return nvt.Workflow(features, client) | |
def analyze_dataset( | |
workflow, | |
dataset, | |
): | |
"""Calculate statistics for a given workflow.""" | |
workflow.fit(dataset) | |
return workflow | |
def transform_dataset( | |
dataset, | |
workflow | |
): | |
"""Apply the transformations to the dataset.""" | |
workflow.transform(dataset) | |
return dataset | |
def load_workflow( | |
workflow_path, | |
client, | |
): | |
"""Load a workflow definition from a path.""" | |
return nvt.Workflow.load(workflow_path, client) | |
def main(): | |
num_output_files_train = 32 # Number of output files after converting CSV to Parquet | |
num_output_files_valid = 8 # Number of output files after converting CSV to Parquet | |
# data_path = '/criteo/criteo_raw_parquet_train' | |
data_path = '/criteo/criteo_raw_parquet' | |
# output_path = '/criteo/criteo_processed_parquet_train' | |
output_path = '/criteo/criteo_processed_parquet_1' | |
path = '/scripts/workspace_2022/merlin/workflow_1' | |
n_workers = 4 | |
frac_size = 0.10 | |
num_buckets = 10_000_000 | |
memory_limit = 100_000_000_000 | |
device_size = device_mem_size() | |
device_limit_frac, device_pool_frac = 0.60, 0.90 | |
device_limit = int(device_limit_frac * device_size) | |
device_pool_size = int(device_pool_frac * device_size) | |
rmm_pool_size = (device_pool_size // 256) * 256 | |
# Spin up local cluster | |
cluster = LocalCUDACluster( | |
n_workers=n_workers, | |
memory_limit=memory_limit, | |
device_memory_limit=device_limit, | |
rmm_pool_size=rmm_pool_size, | |
) | |
client = Client(cluster) | |
print(client) | |
# Define dataset | |
dataset = nvt.Dataset( | |
data_path, | |
engine="parquet", | |
part_mem_fraction=frac_size, | |
client=client, | |
) | |
# Create Workflow | |
cont_names = ['I' + str(x) for x in range(1, 14)] | |
cat_names = ['C' + str(x) for x in range(1, 27)] | |
# Transformation pipeline | |
num_buckets = 10000000 | |
categorify_op = Categorify(max_size=num_buckets) | |
cat_features = cat_names >> categorify_op | |
cont_features = cont_names >> FillMissing() >> Clip( | |
min_value=0) >> Normalize() | |
features = cat_features + cont_features + ['label'] | |
criteo_workflow = nvt.Workflow(features, client) | |
criteo_workflow = criteo_workflow.fit(dataset) | |
criteo_workflow.save(path) | |
###Getting slot size### | |
#--------------------## | |
embeddings_dict_cat = categorify_op.get_embedding_sizes(cat_names) | |
embeddings = [embeddings_dict_cat[c][0] for c in cat_names] | |
print(embeddings) | |
##--------------------## | |
CATEGORICAL_COLUMNS = ["C" + str(x) for x in range(1, 27)] | |
CONTINUOUS_COLUMNS = ["I" + str(x) for x in range(1, 14)] | |
LABEL_COLUMNS = ["label"] | |
dict_dtypes = {} | |
for col in CATEGORICAL_COLUMNS: | |
dict_dtypes[col] = np.int64 | |
for col in CONTINUOUS_COLUMNS: | |
dict_dtypes[col] = np.float32 | |
for col in LABEL_COLUMNS: | |
dict_dtypes[col] = np.float32 | |
dataset.to_parquet( | |
output_path=output_path, | |
shuffle=None, | |
output_files=num_output_files_train, | |
# output_files=num_output_files_valid, | |
dtypes=dict_dtypes, | |
cats=CATEGORICAL_COLUMNS, | |
conts=CONTINUOUS_COLUMNS, | |
labels=LABEL_COLUMNS | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment