Last active
September 12, 2017 16:26
-
-
Save pietern/bcee7a1c64e8b595478fe6be4187d5fd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# allreduce_loop.py | |
# | |
# Showcase for how to use Gloo collectives from Caffe2. | |
# For rendezvous, this example can use a shared filesystem, Redis, or MPI. | |
# | |
# To use a shared filesystem for rendezvous, use: | |
# | |
# python ./allreduce_loop.py \ | |
# --num_gpus 1 \ | |
# --distributed_transport tcp \ | |
# --distributed_interface eth0 \ | |
# --num_shards 2 \ | |
# --shard_id 0 \ # Specify a different rank on every machine | |
# --file_store_path /path/to/nfs/share \ | |
# --run_id 12345 # Unique for each separate run | |
# | |
# To use Redis for rendezvous, use: | |
# | |
# python ./allreduce_loop.py \ | |
# --num_gpus 1 \ | |
# --distributed_transport ibverbs \ | |
# --distributed_interface mlx5_0 \ | |
# --gpu_direct \ | |
# --num_shards 2 \ | |
# --shard_id 0 \ # Specify a different rank on every machine | |
# --redis_host some.ip.address \ | |
# --redis_port 6380 \ | |
# --run_id 12345 # Unique for each separate run | |
# | |
# To use MPI for rendezvous, use: | |
# | |
# mpirun python ./allreduce_loop.py \ | |
# --num_gpus 1 \ | |
# --distributed_transport ibverbs \ | |
# --distributed_interface mlx5_0 \ | |
# --gpu_direct | |
# | |
from __future__ import absolute_import | |
from __future__ import division | |
from __future__ import print_function | |
from __future__ import unicode_literals | |
import argparse | |
import logging | |
import numpy as np | |
import time | |
import os | |
from caffe2.python import core, workspace | |
from caffe2.proto import caffe2_pb2 | |
logging.basicConfig() | |
log = logging.getLogger("allreduce_loop") | |
log.setLevel(logging.DEBUG) | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="Caffe2 rendezvous example", | |
) | |
parser.add_argument("--gpus", type=str, | |
help="Comma separated list of GPU devices to use") | |
parser.add_argument("--num_gpus", type=int, default=1, | |
help="Number of GPU devices (instead of --gpus)") | |
parser.add_argument("--num_shards", type=int, default=1, | |
help="Number of machines in distributed run") | |
parser.add_argument("--shard_id", type=int, default=0, | |
help="Shard id.") | |
parser.add_argument("--run_id", type=str, | |
help="Unique run identifier (e.g. uuid)") | |
parser.add_argument("--redis_host", type=str, | |
help="Host of Redis server (for rendezvous)") | |
parser.add_argument("--redis_port", type=int, default=6379, | |
help="Port of Redis server (for rendezvous)") | |
parser.add_argument("--file_store_path", type=str, default="/tmp", | |
help="Path to directory to use for rendezvous") | |
parser.add_argument("--distributed_transport", type=str, default="tcp", | |
help="Transport to use for distributed run [tcp|ibverbs]") | |
parser.add_argument("--distributed_interfaces", type=str, default="", | |
help="Network interfaces to use for distributed run") | |
parser.add_argument("--gpu_direct", default=False, action="store_true", | |
help="Use GPUDirect (if using ibverbs transport)") | |
parser.add_argument("--iterations", type=int, default=100, | |
help="Number of iterations to run for") | |
args = parser.parse_args() | |
# Either use specified device list or generate one | |
if args.gpus is not None: | |
gpus = [int(x) for x in args.gpus.split(",")] | |
num_gpus = len(gpus) | |
else: | |
gpus = list(range(args.num_gpus)) | |
num_gpus = args.num_gpus | |
log.info("Running on GPUs: {}".format(gpus)) | |
num_shards = args.num_shards | |
shard_id = args.shard_id | |
store_handler = None | |
# Expect interfaces to be comma separated. | |
# Use of multiple network interfaces is not yet complete, | |
# so simply use the first one in the list. | |
interfaces = args.distributed_interfaces.split(",") | |
# Rendezvous using MPI when run with mpirun | |
if os.getenv("OMPI_COMM_WORLD_SIZE") is not None: | |
num_shards = int(os.getenv("OMPI_COMM_WORLD_SIZE", 1)) | |
shard_id = int(os.getenv("OMPI_COMM_WORLD_RANK", 0)) | |
if num_shards > 1: | |
rendezvous = dict( | |
kv_handler=None, | |
num_shards=num_shards, | |
shard_id=shard_id, | |
engine="GLOO", | |
transport=args.distributed_transport, | |
interface=interfaces[0], | |
mpi_rendezvous=True) | |
elif num_shards > 1: | |
# Create rendezvous for distributed computation | |
store_handler = "store_handler" | |
if args.redis_host is not None: | |
# Use Redis for rendezvous if Redis host is specified | |
workspace.RunOperatorOnce( | |
core.CreateOperator( | |
"RedisStoreHandlerCreate", [], [store_handler], | |
host=args.redis_host, | |
port=args.redis_port, | |
prefix=args.run_id, | |
) | |
) | |
else: | |
# Use filesystem for rendezvous otherwise | |
workspace.RunOperatorOnce( | |
core.CreateOperator( | |
"FileStoreHandlerCreate", [], [store_handler], | |
path=args.file_store_path, | |
prefix=args.run_id, | |
) | |
) | |
rendezvous = dict( | |
kv_handler=store_handler, | |
num_shards=num_shards, | |
shard_id=shard_id, | |
engine="GLOO", | |
transport=args.distributed_transport, | |
interface=interfaces[0]) | |
if rendezvous is None: | |
raise(RuntimeError("No rendezvous mechanism configured!")) | |
init_net = core.Net("init_net") | |
shape = [32, 3, 224, 224] | |
num_elements = reduce(lambda x, y: x*y, shape) | |
num_bytes = num_elements * 4 | |
num_kilobytes = num_bytes / 1024.0 | |
num_megabytes = num_kilobytes / 1024.0 | |
num_gigabytes = num_megabytes / 1024.0 | |
# Initialize N blobs, 1 per GPU | |
blobs = [] | |
for gpu in gpus: | |
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, gpu)): | |
blobs.append( | |
init_net.UniformFill( | |
[], | |
[init_net.NextBlob("blob")], | |
shape=shape)) | |
# Create Gloo common world | |
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)): | |
comm_world = init_net.CreateCommonWorld( | |
[store_handler] if store_handler is not None else [], | |
[init_net.NextBlob("comm_world")], | |
name="first_and_only_common_world", | |
size=rendezvous["num_shards"], | |
rank=rendezvous["shard_id"], | |
engine=rendezvous["engine"], | |
transport=rendezvous["transport"], | |
interface=rendezvous["interface"], | |
mpi_rendezvous=rendezvous.get("mpi_rendezvous", False), | |
) | |
# Initialize | |
workspace.RunNetOnce(init_net) | |
# Our main net is just looping on Allreduce | |
main_net = core.Net("main_net") | |
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)): | |
main_net.Allreduce( | |
inputs=[comm_world] + blobs, | |
outputs=blobs, | |
engine=rendezvous["engine"], | |
gpu_direct=args.gpu_direct, | |
) | |
workspace.CreateNet(main_net) | |
for i in xrange(args.iterations): | |
t1 = time.time() | |
workspace.RunNet(main_net) | |
t2 = time.time() | |
if shard_id == 0: | |
dt = (t2 - t1) | |
print("Took {:.3f}s ({:.3f} GB/sec)".format(dt, num_gigabytes / dt)) | |
if __name__ == "__main__": | |
workspace.GlobalInit(["caffe2", "--caffe2_log_level=2"]) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment