Skip to content

Instantly share code, notes, and snippets.

@AymenFJA
Last active January 15, 2022 02:06
Show Gist options
  • Save AymenFJA/3e92202715cd2fb3dbd3da0fe30a49c7 to your computer and use it in GitHub Desktop.
Save AymenFJA/3e92202715cd2fb3dbd3da0fe30a49c7 to your computer and use it in GitHub Desktop.
test_multi_gpu_mpi.py
from __future__ import print_function
'''
Basic Multi GPU computation example using TensorFlow library.
Single/Multi-GPU non-MPI Author: Aymeric Damien
Multi-GPU Large scale/Multi-node MPI Author: Aymen Alsaadi
Project: https://github.com/aymericdamien/TensorFlow-Examples/
'''
'''
This tutorial requires your machine to have 2 GPUs
"/cpu:0": The CPU of your machine.
"/gpu:0": The first GPU of your machine
"/gpu:1": The second GPU of your machine
Every MPI worker takes control of 2 GPUs.
To run execute: mpirun -n 2 python test_multi_gpu_mpi.py
-n: number of ranks (rank 1 is for master and 2 for the worker (2 GPUs))
'''
import sys
sys.setrecursionlimit(10**6)
import numpy as np
#import tensorflow as tf
import tensorflow.compat.v1 as tf
import datetime
from mpi4py import MPI
tf.disable_v2_behavior()
# Processing Units logs
log_device_placement = True
# work tag
WORKTAG = 1
DIETAG = 0
# Num of multiplications to perform
n = 5000
'''
Example: compute A^n + B^n on 2 GPUs
Results on 16 cores with 2 RTX Quadro 5000:
With the current input (large scale)
it takes:
* Multi GPU computation time: ~ 0:04:00 minutes
'''
def master(comm):
print ("Master starts working ...")
num_procs = comm.Get_size()
status = MPI.Status()
# Create random large matrix
A = np.random.rand(5000, 5000).astype('float32')
for rank in range(1, num_procs):
work = A
comm.send(work, dest=rank, tag=WORKTAG)
# No more work to be done, receive all outstanding results from slaves
for rank in range(1, num_procs):
result = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
#process_result(result)
print("Multi GPU computation time: " + str(result))
#Tell all the slaves to exit by sending an empty message with DIETAG
for rank in range(1, num_procs):
comm.send(0, dest=rank, tag=DIETAG)
def matpow(M, n):
if n < 1: #Abstract cases where n < 1
return M
else:
return tf.matmul(M, matpow(M, n-1))
def worker(comm):
'''
Multi GPU computing
'''
print ("Worker starts working ...")
my_rank = comm.Get_rank()
my_name = MPI.Get_processor_name()
status = MPI.Status()
# Create a graph to store results
c1 = []
c2 = []
while True:
work = comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
A = work
B = work
# Check the tag of the received message
if status.Get_tag() == DIETAG: break
# GPU:0 computes A^n
with tf.device('/gpu:0'):
# Compute A^n and store result in c2
a = tf.placeholder(tf.float32, [5000, 5000])
c2.append(matpow(a, n))
# GPU:1 computes B^n
with tf.device('/gpu:1'):
# Compute B^n and store result in c2
b = tf.placeholder(tf.float32, [5000, 5000])
c2.append(matpow(b, n))
with tf.device('/cpu:0'):
sum = tf.add_n(c2) #Addition of all elements in c2, i.e. A^n + B^n
t1_2 = datetime.datetime.now()
with tf.Session(config=tf.ConfigProto(log_device_placement=log_device_placement)) as sess:
# Run the op.
sess.run(sum, {a:A, b:B})
t2_2 = datetime.datetime.now()
result = str(t2_2-t1_2)
comm.send(result, dest=0, tag=0)
def run(comm):
my_rank = comm.Get_rank()
my_name = MPI.Get_processor_name()
# main function
comm.Barrier()
start = MPI.Wtime()
if my_rank == 0:
master(comm)
else:
worker(comm)
comm.Barrier()
end = MPI.Wtime()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment