Last active
January 13, 2020 16:44
-
-
Save vfdev-5/0278d79fb0741c95224b7b8c76a60e94 to your computer and use it in GitHub Desktop.
Helper scripts to benchmark and check ignite's engine on MNIST, CIFAR10 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# Tests configuration: | |
if [ -z $version ]; then | |
export version="v0.2.1" | |
# export version="master" | |
# export version="engine_refactor" | |
echo "Setup version: $version" | |
fi | |
if [ -z $remote ]; then | |
export remote="pytorch" | |
# export remote="vfdev-5" | |
echo "Setup remote: $remote" | |
fi | |
if [ -z $bench_task ]; then | |
export bench_task="mnist" | |
echo "Setup benchmark task: $bench_task" | |
fi | |
if [ "$bench_task" == "mnist" ]; then | |
export bench_prep_pycmd="from torchvision.datasets import MNIST; MNIST(download=True, root='.')" | |
export bench_cmd="python mnist.py --epochs 5" | |
elif [ "$bench_task" == "contrib/cifar10" ]; then | |
export bench_prep_pycmd="from torchvision import datasets; datasets.CIFAR10(download=True, root='.')" | |
export bench_cmd='python main.py --params=num_epochs=5' | |
else | |
echo "Unknown benchmark task : $bench_task" | |
exit 1 | |
fi | |
export env_name="benchmark_engine__${remote}_${version}" | |
echo "Check existing conda env : $env_name" | |
conda env list | grep $env_name | |
ret=$? | |
set -e | |
ignite_path="/tmp/ignite_${remote}_${version}" | |
if [ "$ret" == "1" ]; then | |
echo "Setup conda env : $env_name" | |
conda config --set always_yes yes --set changeps1 no | |
conda create -q -n $env_name pytorch torchvision python=3.7 -c pytorch | |
source activate $env_name | |
pip install -q tqdm tensorboardX | |
# git clone repository and install ignite | |
rm -rf ${ignite_path} | |
git clone https://github.com/${remote}/ignite.git -b ${version} ${ignite_path} | |
cd ${ignite_path} && pip install --upgrade -e . | |
else | |
source activate $env_name | |
fi | |
echo "Check installed ignite: " | |
pip list | grep ignite | |
cd ${ignite_path}/examples/${bench_task} | |
echo "Download dataset" | |
python -c "${bench_prep_pycmd}" | |
echo "Run $bench_task" | |
time `${bench_cmd}` | |
source deactivate |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -e | |
echo "\n--- Check save/resume training on CIFAR10 dataset ---\n" | |
if [ ! -f ${exec_script} ]; then | |
echo "This check should run from example/contrib/cifar10 folder" | |
exit 1 | |
fi | |
data_path="/data_deep/CLASSIC_DATASETS/cifar10/" | |
base_output_path="/tmp/cifar10-output" | |
with_debug_opts=1 | |
if [ ${with_debug_opts} -eq 0 ]; then | |
exec_script="main.py" | |
no_augs_vals="False" | |
else | |
exec_script="main_debug.py" | |
no_augs_vals="False True" | |
base_output_path=${base_output_path}-debug | |
fi | |
for crash_iter in 510 820 1000; do | |
for n_proc in 1 2; do | |
for no_augs in ${no_augs_vals}; do | |
output_path="$base_output_path/$n_proc-no_augs_${no_augs}-crash_iter_${crash_iter}" | |
echo "\nRUN training $exec_script with crash at $crash_iter iteration, n_proc=$n_proc, no_augs=${no_augs}\n" | |
if [ ${with_debug_opts} -eq 0 ]; then | |
# no debug options | |
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';crash_iteration=$crash_iter;output_path=$output_path" | |
else | |
# debugging | |
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';crash_iteration=$crash_iter;output_path=$output_path;debug__no_augs=${no_augs};debug__display_data_stats=True;debug__crash_iteration=$crash_iter;debug__display_grads=True;debug__display_model_weights=True" | |
# python -u ${exec_script} --params="data_path=$data_path;batch_size=512;crash_iteration=$crash_iter;output_path=$output_path;debug__no_augs=${no_augs};debug__display_data_stats=True;debug__crash_iteration=$crash_iter;debug__display_grads=True;debug__display_model_weights=True" | |
fi | |
chkpt=`find $output_path -name training_checkpoint_*.pth` | |
echo "\nRESUME training $exec_script from a checkpoint: ${chkpt}, n_proc=$n_proc, no_augs=${no_augs}\n" | |
if [ ${with_debug_opts} -eq 0 ]; then | |
# no debug options | |
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';output_path=$output_path;resume_from=$chkpt" | |
else | |
# debugging | |
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';output_path=$output_path;resume_from=$chkpt;debug__no_augs=${no_augs};debug__display_data_stats=True;debug__crash_iteration=$crash_iter;debug__display_grads=True;debug__display_model_weights=True" | |
# python -u ${exec_script} --params="data_path=$data_path;batch_size=512;output_path=$output_path;resume_from=$chkpt;debug__no_augs=${no_augs};debug__display_data_stats=True;debug__crash_iteration=$crash_iter;debug__display_grads=True;debug__display_model_weights=True" | |
fi | |
if [ ${with_debug_opts} -eq 0 ]; then | |
echo "\nRUN training $exec_script without crash, n_proc=$n_proc, no_augs=${no_augs}\n" | |
# no debug options | |
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';output_path=$output_path" | |
else | |
exit 0 | |
fi | |
done | |
done | |
done |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# TODO: | |
# - Problem: Resumed run does not see the same data since checkpoint iteration | |
# => loss curve is discontinuous | |
# => train/test accuracy curves are discontinuous | |
# - It is due to DistributedSampler | |
# - If no DistributedSampler used, dataflow is the same but loss curve is still discontinuous | |
# <= This is due to unrestored running average | |
# 1) Improve engine to be able to work with DistributedSampler <=> DONE | |
# 2) Test with Data Augs => data indices are not the same | |
# 3) Test with cudnn non deterministic if there is a gap train/test accuracy curves | |
import argparse | |
from pathlib import Path | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import torch.nn.parallel | |
import torch.distributed as dist | |
import ignite | |
from ignite.engine import Events, Engine, create_supervised_evaluator | |
from ignite.metrics import Accuracy, Loss | |
from ignite.handlers import Checkpoint, global_step_from_engine | |
from ignite.utils import convert_tensor | |
from ignite.contrib.engines import common | |
from ignite.contrib.handlers import TensorboardLogger, ProgressBar | |
from ignite.contrib.handlers.tensorboard_logger import OutputHandler, OptimizerParamsHandler, GradsHistHandler | |
from ignite.contrib.handlers import PiecewiseLinear | |
from utils import set_seed, get_train_test_loaders, get_model | |
def run(output_path, config): | |
device = "cuda" | |
local_rank = config['local_rank'] | |
distributed = backend is not None | |
if distributed: | |
torch.cuda.set_device(local_rank) | |
device = "cuda" | |
rank = dist.get_rank() if distributed else 0 | |
torch.manual_seed(config['seed'] + rank) | |
# Rescale batch_size and num_workers | |
ngpus_per_node = torch.cuda.device_count() | |
ngpus = dist.get_world_size() if distributed else 1 | |
batch_size = config['batch_size'] // ngpus | |
num_workers = int((config['num_workers'] + ngpus_per_node - 1) / ngpus_per_node) | |
train_loader, test_loader = get_train_test_loaders( | |
path=config['data_path'], | |
batch_size=batch_size, | |
distributed=distributed, | |
num_workers=num_workers, | |
no_augs=config['debug__no_augs'], | |
) | |
model = get_model(config['model']) | |
model = model.to(device) | |
if distributed: | |
model = torch.nn.parallel.DistributedDataParallel(model, | |
device_ids=[local_rank, ], | |
output_device=local_rank) | |
optimizer = optim.SGD(model.parameters(), lr=config['learning_rate'], | |
momentum=config['momentum'], | |
weight_decay=config['weight_decay'], | |
nesterov=True) | |
criterion = nn.CrossEntropyLoss().to(device) | |
le = len(train_loader) | |
milestones_values = [ | |
(0, 0.0), | |
(le * config['num_warmup_epochs'], config['learning_rate']), | |
(le * config['num_epochs'], 0.0) | |
] | |
lr_scheduler = PiecewiseLinear(optimizer, param_name="lr", | |
milestones_values=milestones_values) | |
def _prepare_batch(batch, device, non_blocking): | |
x, y = batch | |
return (convert_tensor(x, device=device, non_blocking=non_blocking), | |
convert_tensor(y, device=device, non_blocking=non_blocking)) | |
def process_function(engine, batch): | |
x, y = _prepare_batch(batch, device=device, non_blocking=True) | |
model.train() | |
# Supervised part | |
y_pred = model(x) | |
loss = criterion(y_pred, y) | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
return { | |
'batch loss': loss.item(), | |
} | |
trainer = Engine(process_function) | |
train_sampler = train_loader.sampler if distributed else None | |
to_save = {'trainer': trainer, 'model': model, 'optimizer': optimizer, 'lr_scheduler': lr_scheduler} | |
metric_names = ['batch loss', ] | |
common.setup_common_training_handlers(trainer, train_sampler=train_sampler, | |
to_save=to_save, save_every_iters=config['checkpoint_every'], | |
output_path=output_path, lr_scheduler=lr_scheduler, | |
output_names=metric_names, with_pbar_on_iters=config['display_iters'], | |
log_every_iters=10) | |
if rank == 0: | |
tb_logger = TensorboardLogger(log_dir=output_path) | |
tb_logger.attach(trainer, | |
log_handler=OutputHandler(tag="train", | |
metric_names=metric_names), | |
event_name=Events.ITERATION_COMPLETED) | |
tb_logger.attach(trainer, | |
log_handler=OptimizerParamsHandler(optimizer, param_name="lr"), | |
event_name=Events.ITERATION_STARTED) | |
metrics = { | |
"accuracy": Accuracy(device=device if distributed else None), | |
"loss": Loss(criterion, device=device if distributed else None) | |
} | |
evaluator = create_supervised_evaluator(model, metrics=metrics, device=device, non_blocking=True) | |
train_evaluator = create_supervised_evaluator(model, metrics=metrics, device=device, non_blocking=True) | |
def run_validation(engine): | |
torch.cuda.synchronize() | |
train_evaluator.run(train_loader) | |
evaluator.run(test_loader) | |
trainer.add_event_handler(Events.EPOCH_STARTED(every=3), run_validation) | |
trainer.add_event_handler(Events.COMPLETED, run_validation) | |
if rank == 0: | |
if config['display_iters']: | |
ProgressBar(persist=False, desc="Train evaluation").attach(train_evaluator) | |
ProgressBar(persist=False, desc="Test evaluation").attach(evaluator) | |
tb_logger.attach(train_evaluator, | |
log_handler=OutputHandler(tag="train", | |
metric_names=list(metrics.keys()), | |
global_step_transform=global_step_from_engine(trainer)), | |
event_name=Events.COMPLETED) | |
tb_logger.attach(evaluator, | |
log_handler=OutputHandler(tag="test", | |
metric_names=list(metrics.keys()), | |
global_step_transform=global_step_from_engine(trainer)), | |
event_name=Events.COMPLETED) | |
# Store the best model by validation accuracy: | |
common.save_best_model_by_val_score(output_path, evaluator, model=model, metric_name='accuracy', n_saved=3, | |
trainer=trainer, tag="test") | |
if config['log_model_grads_every'] is not None: | |
tb_logger.attach(trainer, | |
log_handler=GradsHistHandler(model, tag=model.__class__.__name__), | |
event_name=Events.ITERATION_COMPLETED(every=config['log_model_grads_every'])) | |
if config['crash_iteration'] is not None: | |
@trainer.on(Events.ITERATION_STARTED(once=config['crash_iteration'])) | |
def _(engine): | |
raise Exception("STOP at iteration: {}".format(engine.state.iteration)) | |
def ef(_, event): | |
if event in [1, 2, 3]: | |
return True | |
elif event % config['checkpoint_every'] in [0, 1, 2]: | |
return True | |
return False | |
if config['debug__display_data_stats']: | |
trainer.add_event_handler(Events.ITERATION_COMPLETED(event_filter=ef), debug_print_data_stats) | |
if config['debug__display_grads']: | |
trainer.add_event_handler(Events.ITERATION_COMPLETED(event_filter=ef), debug_print_model_grads, model) | |
if config['debug__display_model_weights']: | |
trainer.add_event_handler(Events.STARTED, debug_print_model_weights, model) | |
trainer.add_event_handler(Events.ITERATION_COMPLETED(event_filter=ef), debug_print_model_weights, model) | |
trainer.add_event_handler(Events.ITERATION_COMPLETED(event_filter=ef), debug_print_batch_loss) | |
# if config['debug__crash_iteration'] is not None: | |
# crash_iter = config['debug__crash_iteration'] | |
# | |
# def near_after_crash_iter(_, event): | |
# if crash_iter - 15 <= event <= crash_iter + 15: | |
# return True | |
# return False | |
# | |
# if config['debug__display_data_stats']: | |
# trainer.add_event_handler(Events.ITERATION_COMPLETED(event_filter=near_after_crash_iter), | |
# debug_print_data_stats) | |
# | |
# if config['debug__display_grads']: | |
# trainer.add_event_handler(Events.ITERATION_COMPLETED(event_filter=near_after_crash_iter), | |
# debug_print_model_grads, model) | |
resume_from = config['resume_from'] | |
if resume_from is not None: | |
checkpoint_fp = Path(resume_from) | |
assert checkpoint_fp.exists(), "Checkpoint '{}' is not found".format(checkpoint_fp.as_posix()) | |
print("Resume from a checkpoint: {}".format(checkpoint_fp.as_posix())) | |
checkpoint = torch.load(checkpoint_fp.as_posix()) | |
Checkpoint.load_objects(to_load=to_save, checkpoint=checkpoint) | |
try: | |
trainer.run(train_loader, max_epochs=config['num_epochs'], seed=config['seed']) | |
except Exception as e: | |
import traceback | |
print(traceback.format_exc()) | |
if rank == 0: | |
tb_logger.close() | |
def debug_print_model_weights(engine, model): | |
output = {'total': 0.0} | |
max_counter = 5 | |
for name, p in model.named_parameters(): | |
name = name.replace('.', '/') | |
n = torch.norm(p) | |
if max_counter > 0: | |
output[name] = n | |
output['total'] += n | |
max_counter -= 1 | |
print("{} | {}: {}".format( | |
engine.state.epoch, | |
engine.state.iteration, | |
" - ".join(["{}:{:.4f}".format(m, v) for m, v in output.items()])) | |
) | |
def debug_print_model_grads(engine, model): | |
output = {'grads/total': 0.0} | |
max_counter = 5 | |
for name, p in model.named_parameters(): | |
if p.grad is None: | |
continue | |
name = name.replace('.', '/') | |
n = torch.norm(p.grad) | |
if max_counter > 0: | |
output['grads/{}'.format(name)] = n | |
output['grads/total'] += n | |
max_counter -= 1 | |
print("{} | {}: {}".format( | |
engine.state.epoch, | |
engine.state.iteration, | |
" - ".join(["{}:{:.4f}".format(m, v) for m, v in output.items()])) | |
) | |
def debug_print_data_stats(engine): | |
x, y = engine.state.batch | |
output = { | |
'batch xmean': x.mean().item(), | |
'batch xstd': x.std().item(), | |
'batch ymedian': y.median().item(), | |
} | |
print("{} | {}: {}".format( | |
engine.state.epoch, | |
engine.state.iteration, | |
" - ".join(["{}:{:.7f}".format(m, v) for m, v in output.items()])) | |
) | |
def debug_print_batch_loss(engine): | |
output = engine.state.output | |
print("{} | {}: {}".format( | |
engine.state.epoch, | |
engine.state.iteration, | |
" - ".join(["{}:{:.5f}".format(m, v) for m, v in output.items()])) | |
) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser("Training a CNN on CIFAR10 dataset") | |
parser.add_argument('--network', type=str, default="fastresnet", help="Network to train") | |
parser.add_argument('--params', type=str, | |
help='Override default configuration with parameters: ' | |
'data_path=/path/to/dataset;batch_size=64;num_workers=12 ...') | |
parser.add_argument('--local_rank', type=int, help='Local process rank in distributed computation') | |
args = parser.parse_args() | |
network_name = args.network | |
assert torch.cuda.is_available() | |
# torch.backends.cudnn.benchmark = True | |
torch.backends.cudnn.deterministic = True | |
torch.backends.cudnn.benchmark = False | |
batch_size = 512 | |
num_epochs = 24 | |
config = { | |
"seed": 12, | |
"data_path": "/tmp/cifar10", | |
"output_path": "/tmp/cifar10-output", | |
"model": network_name, | |
"momentum": 0.9, | |
"weight_decay": 1e-4, | |
"batch_size": batch_size, | |
"num_workers": 10, | |
"num_epochs": num_epochs, | |
"learning_rate": 0.04, | |
"num_warmup_epochs": 4, | |
# distributed settings | |
"dist_url": "env://", | |
"dist_backend": None, # if None distributed option is disabled, set to "nccl" to enable | |
# Logging: | |
"display_iters": True, | |
"log_model_grads_every": None, | |
"checkpoint_every": 200, | |
# Crash/Resume training: | |
"resume_from": None, # Path to checkpoint file .pth | |
"crash_iteration": None, | |
"debug__no_augs": False, # for debugging near and after crash_iteration | |
"debug__crash_iteration": None, # for debugging near and after crash_iteration | |
"debug__display_data_stats": False, # for debugging near and after crash_iteration | |
"debug__display_grads": False, # for debugging near and after crash_iteration | |
"debug__display_model_weights": False, # for debugging model weights | |
} | |
# Default configuration dictionary | |
if args.local_rank is not None: | |
config['local_rank'] = args.local_rank | |
else: | |
config['local_rank'] = 0 | |
# Override config: | |
if args.params is not None: | |
for param in args.params.split(";"): | |
key, value = param.split("=") | |
if "/" not in value: | |
value = eval(value) | |
config[key] = value | |
backend = config['dist_backend'] | |
distributed = backend is not None | |
if distributed: | |
dist.init_process_group(backend, init_method=config['dist_url']) | |
# let each node print the info | |
if config['local_rank'] == 0: | |
print("\nDistributed setting:") | |
print("\tbackend: {}".format(dist.get_backend())) | |
print("\tworld size: {}".format(dist.get_world_size())) | |
print("\trank: {}".format(dist.get_rank())) | |
print("\n") | |
output_path = None | |
# let each node print the info | |
if config['local_rank'] == 0: | |
print("Train {} on CIFAR10".format(network_name)) | |
print("- PyTorch version: {}".format(torch.__version__)) | |
print("- Ignite version: {}".format(ignite.__version__)) | |
print("- CUDA version: {}".format(torch.version.cuda)) | |
print("\n") | |
print("Configuration:") | |
for key, value in config.items(): | |
print("\t{}: {}".format(key, value)) | |
print("\n") | |
# create log directory only by 1 node | |
if (not distributed) or (dist.get_rank() == 0): | |
from datetime import datetime | |
now = datetime.now().strftime("%Y%m%d-%H%M%S") | |
gpu_conf = "-single-gpu" | |
if distributed: | |
ngpus_per_node = torch.cuda.device_count() | |
nnodes = dist.get_world_size() // ngpus_per_node | |
gpu_conf = "-distributed-{}nodes-{}gpus".format(nnodes, ngpus_per_node) | |
output_path = Path(config['output_path']) / "{}{}".format(now, gpu_conf) | |
if not output_path.exists(): | |
output_path.mkdir(parents=True) | |
output_path = output_path.as_posix() | |
print("Output path: {}".format(output_path)) | |
try: | |
run(output_path, config) | |
except KeyboardInterrupt: | |
print("Catched KeyboardInterrupt -> exit") | |
except Exception as e: | |
if distributed: | |
dist.destroy_process_group() | |
raise e | |
if distributed: | |
dist.destroy_process_group() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -e | |
echo "\n--- Check save/resume training on CIFAR10 dataset ---\n" | |
if [ ! -f ${exec_script} ]; then | |
echo "This check should run from example/contrib/cifar10 folder" | |
exit 1 | |
fi | |
data_path="/data_deep/CLASSIC_DATASETS/cifar10/" | |
base_output_path="/tmp/cifar10-output-1000" | |
exec_script="main.py" | |
no_augs_vals="False" | |
for crash_iter in 1000; do | |
for n_proc in 2; do | |
output_path="$base_output_path/$n_proc-crash_iter_${crash_iter}" | |
echo "\nRUN training $exec_script with crash at $crash_iter iteration, n_proc=$n_proc\n" | |
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';crash_iteration=$crash_iter;output_path=$output_path;validate_every=1" | |
chkpt=`find $output_path -name training_checkpoint_*.pth` | |
echo "\nRESUME training $exec_script from a checkpoint: ${chkpt}, n_proc=$n_proc\n" | |
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';output_path=$output_path;resume_from=$chkpt;validate_every=1" | |
echo "\nRUN training $exec_script without crash, n_proc=$n_proc\n" | |
# no debug options | |
python -u -m torch.distributed.launch --nproc_per_node=${n_proc} ${exec_script} --params="data_path=$data_path;batch_size=512;dist_backend='nccl';output_path=$output_path;validate_every=1" | |
done | |
done |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment