Skip to content

Instantly share code, notes, and snippets.

@pietern
Last active September 12, 2017 16:26
Show Gist options
  • Save pietern/bcee7a1c64e8b595478fe6be4187d5fd to your computer and use it in GitHub Desktop.
Save pietern/bcee7a1c64e8b595478fe6be4187d5fd to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#
# allreduce_loop.py
#
# Showcase for how to use Gloo collectives from Caffe2.
# For rendezvous, this example can use a shared filesystem, Redis, or MPI.
#
# To use a shared filesystem for rendezvous, use:
#
# python ./allreduce_loop.py \
# --num_gpus 1 \
# --distributed_transport tcp \
# --distributed_interface eth0 \
# --num_shards 2 \
# --shard_id 0 \ # Specify a different rank on every machine
# --file_store_path /path/to/nfs/share \
# --run_id 12345 # Unique for each separate run
#
# To use Redis for rendezvous, use:
#
# python ./allreduce_loop.py \
# --num_gpus 1 \
# --distributed_transport ibverbs \
# --distributed_interface mlx5_0 \
# --gpu_direct \
# --num_shards 2 \
# --shard_id 0 \ # Specify a different rank on every machine
# --redis_host some.ip.address \
# --redis_port 6380 \
# --run_id 12345 # Unique for each separate run
#
# To use MPI for rendezvous, use:
#
# mpirun python ./allreduce_loop.py \
# --num_gpus 1 \
# --distributed_transport ibverbs \
# --distributed_interface mlx5_0 \
# --gpu_direct
#
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import argparse
import logging
import numpy as np
import time
import os
from caffe2.python import core, workspace
from caffe2.proto import caffe2_pb2
logging.basicConfig()
log = logging.getLogger("allreduce_loop")
log.setLevel(logging.DEBUG)
def main():
parser = argparse.ArgumentParser(
description="Caffe2 rendezvous example",
)
parser.add_argument("--gpus", type=str,
help="Comma separated list of GPU devices to use")
parser.add_argument("--num_gpus", type=int, default=1,
help="Number of GPU devices (instead of --gpus)")
parser.add_argument("--num_shards", type=int, default=1,
help="Number of machines in distributed run")
parser.add_argument("--shard_id", type=int, default=0,
help="Shard id.")
parser.add_argument("--run_id", type=str,
help="Unique run identifier (e.g. uuid)")
parser.add_argument("--redis_host", type=str,
help="Host of Redis server (for rendezvous)")
parser.add_argument("--redis_port", type=int, default=6379,
help="Port of Redis server (for rendezvous)")
parser.add_argument("--file_store_path", type=str, default="/tmp",
help="Path to directory to use for rendezvous")
parser.add_argument("--distributed_transport", type=str, default="tcp",
help="Transport to use for distributed run [tcp|ibverbs]")
parser.add_argument("--distributed_interfaces", type=str, default="",
help="Network interfaces to use for distributed run")
parser.add_argument("--gpu_direct", default=False, action="store_true",
help="Use GPUDirect (if using ibverbs transport)")
parser.add_argument("--iterations", type=int, default=100,
help="Number of iterations to run for")
args = parser.parse_args()
# Either use specified device list or generate one
if args.gpus is not None:
gpus = [int(x) for x in args.gpus.split(",")]
num_gpus = len(gpus)
else:
gpus = list(range(args.num_gpus))
num_gpus = args.num_gpus
log.info("Running on GPUs: {}".format(gpus))
num_shards = args.num_shards
shard_id = args.shard_id
store_handler = None
# Expect interfaces to be comma separated.
# Use of multiple network interfaces is not yet complete,
# so simply use the first one in the list.
interfaces = args.distributed_interfaces.split(",")
# Rendezvous using MPI when run with mpirun
if os.getenv("OMPI_COMM_WORLD_SIZE") is not None:
num_shards = int(os.getenv("OMPI_COMM_WORLD_SIZE", 1))
shard_id = int(os.getenv("OMPI_COMM_WORLD_RANK", 0))
if num_shards > 1:
rendezvous = dict(
kv_handler=None,
num_shards=num_shards,
shard_id=shard_id,
engine="GLOO",
transport=args.distributed_transport,
interface=interfaces[0],
mpi_rendezvous=True)
elif num_shards > 1:
# Create rendezvous for distributed computation
store_handler = "store_handler"
if args.redis_host is not None:
# Use Redis for rendezvous if Redis host is specified
workspace.RunOperatorOnce(
core.CreateOperator(
"RedisStoreHandlerCreate", [], [store_handler],
host=args.redis_host,
port=args.redis_port,
prefix=args.run_id,
)
)
else:
# Use filesystem for rendezvous otherwise
workspace.RunOperatorOnce(
core.CreateOperator(
"FileStoreHandlerCreate", [], [store_handler],
path=args.file_store_path,
prefix=args.run_id,
)
)
rendezvous = dict(
kv_handler=store_handler,
num_shards=num_shards,
shard_id=shard_id,
engine="GLOO",
transport=args.distributed_transport,
interface=interfaces[0])
if rendezvous is None:
raise(RuntimeError("No rendezvous mechanism configured!"))
init_net = core.Net("init_net")
shape = [32, 3, 224, 224]
num_elements = reduce(lambda x, y: x*y, shape)
num_bytes = num_elements * 4
num_kilobytes = num_bytes / 1024.0
num_megabytes = num_kilobytes / 1024.0
num_gigabytes = num_megabytes / 1024.0
# Initialize N blobs, 1 per GPU
blobs = []
for gpu in gpus:
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, gpu)):
blobs.append(
init_net.UniformFill(
[],
[init_net.NextBlob("blob")],
shape=shape))
# Create Gloo common world
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)):
comm_world = init_net.CreateCommonWorld(
[store_handler] if store_handler is not None else [],
[init_net.NextBlob("comm_world")],
name="first_and_only_common_world",
size=rendezvous["num_shards"],
rank=rendezvous["shard_id"],
engine=rendezvous["engine"],
transport=rendezvous["transport"],
interface=rendezvous["interface"],
mpi_rendezvous=rendezvous.get("mpi_rendezvous", False),
)
# Initialize
workspace.RunNetOnce(init_net)
# Our main net is just looping on Allreduce
main_net = core.Net("main_net")
with core.DeviceScope(core.DeviceOption(caffe2_pb2.CUDA, 0)):
main_net.Allreduce(
inputs=[comm_world] + blobs,
outputs=blobs,
engine=rendezvous["engine"],
gpu_direct=args.gpu_direct,
)
workspace.CreateNet(main_net)
for i in xrange(args.iterations):
t1 = time.time()
workspace.RunNet(main_net)
t2 = time.time()
if shard_id == 0:
dt = (t2 - t1)
print("Took {:.3f}s ({:.3f} GB/sec)".format(dt, num_gigabytes / dt))
if __name__ == "__main__":
workspace.GlobalInit(["caffe2", "--caffe2_log_level=2"])
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment