Created
June 15, 2020 07:51
-
-
Save AngelBerihuete/772cb5dfe02e27ff2c5c3b48651c0846 to your computer and use it in GitHub Desktop.
test_cross_gpu
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
#------- Descripción del trabajo ------- | |
#SBATCH --job-name='test_cross_gpu' | |
#SBATCH --qos=debug | |
#------- Avisos ------- | |
#SBATCH [email protected] | |
#SBATCH --mail-type=END,FAIL,TIME_LIMIT_80 | |
#------- Parametrización ------- | |
# | |
# In order to use GPUs devices on BSC it | |
# is mandatory to allocate 40 CPUs per GPU | |
# requested. | |
# You specify the gres configuration PER-NODE | |
# for a job with the --gres flag and a number | |
# of GPUs. For instance, --gres=gpu:4, we will | |
# request four GPUs per node (maximun GPUs per | |
# node in this machine (CTE-POWER at BSC). | |
# Then, if also you set --node=2, you'll request | |
# 2*4 GPUs, and --ntasks=2*4*40=320 | |
# | |
# Remember the term "task" in this context can | |
# be thought of as a "process". In Slurm, tasks | |
# are requested with the --ntasks flag. CPUs, | |
# for the multithreaded programs, are requested | |
# with the --cpus-per-task flag. | |
#SBATCH --nodes=1 | |
#SBATCH --ntasks=1 | |
#SBATCH --cpus-per-task=160 | |
#SBATCH --gres=gpu:4 | |
#SBATCH --time=00:15:00 | |
#------- Entrada/Salida ------- | |
#SBATCH -D . | |
#SBATCH --output=test_cross_gpu_%j.out | |
#SBATCH --error=test_cross_gpu_%j.err | |
#------- Carga de módulos ------- | |
module purge | |
module load gcc/8.3.0 cuda/10.1 cudnn/7.6.4 nccl/2.4.8 tensorrt/6.0.1 openmpi/4.0.1 atlas/3.10.3 scalapack/2.0.2 fftw/3.3.8 szip/2.1.1 ffmpeg/4.2.1 opencv/4.1.1 python/3.7.4_ML | |
echo "== Starting run at $(date)" | |
echo "== Job ID: ${SLURM_JOBID}" | |
echo "== Job NPROCS: ${SLURM_NPROCS}" | |
echo "== Job NNODES: ${SLURM_NNODES}" | |
echo "== Node list: ${SLURM_NODELIST}" | |
echo "== Submit dir. : ${SLURM_SUBMIT_DIR}" | |
#------- Comando ------ | |
srun test_cross_gpu_logprob.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/apps/PYTHON/3.7.4_ML/bin/python | |
from __future__ import absolute_import, division, print_function, unicode_literals | |
import sys | |
import os | |
# os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID" | |
# os.environ['CUDA_VISIBLE_DEVICES'] = "0,1,2,3" | |
import numpy as np | |
import tensorflow as tf | |
import tensorflow_probability as tfp | |
tfb, tfd = tfp.bijectors, tfp.distributions | |
print("TF version: ", tf.__version__) | |
print("TFP: ", tfp.__version__) | |
resolver = tf.distribute.cluster_resolver.SlurmClusterResolver() | |
st = tf.distribute.experimental.MultiWorkerMirroredStrategy(cluster_resolver=resolver) | |
# Sanity check to observe which device has calculations | |
# tf.debugging.set_log_device_placement(True) | |
# ------------------------ | |
# Setting up physical GPUs | |
# ------------------------ | |
# | |
# Nothing to do with previos os.eviron['CUDA...] the system recognize them. | |
# physical_gpus = tf.config.list_physical_devices('GPU') | |
# print(len(physical_gpus), "Physical GPUs") | |
# gpus = tf.config.experimental.list_physical_devices('GPU') | |
# if gpus: | |
# # Restrict TensorFlow to only use the first GPU | |
# try: | |
# # Currently, memory growth needs to be the same across GPUs | |
# for gpu in gpus: | |
# tf.config.experimental.set_memory_growth(gpu, True) | |
# logical_gpus = tf.config.experimental.list_logical_devices('GPU') | |
# print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU") | |
# except RuntimeError as e: | |
# # Visible devices must be set before GPUs have been | |
# # initialized | |
# print(e) | |
# st = tf.distribute.MirroredStrategy() | |
# strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0") | |
# ----------------------- | |
# Setting up logical GPUs | |
# ----------------------- | |
# | |
# GPU_SIZE = 6 # (G) MemorySIZE per GPU | |
# for gpu in physical_gpus: | |
# tf.config.experimental.set_virtual_device_configuration( | |
# gpu, | |
# [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=GPU_SIZE*1024)]*2) | |
# logical_gpus = tf.config.list_logical_devices('GPU') | |
# print(len(logical_gpus), "Logical GPUs") | |
# st = tf.distribute.MirroredStrategy( | |
# devices=tf.config.list_logical_devices('GPU')) | |
# st = tf.distribute.MirroredStrategy() | |
# Sanity check | |
print ('Number of devices to do the strategy: {}'.format(st.num_replicas_in_sync)) | |
print('Woker devices: {}', st.extended.worker_devices) | |
# sys.exit() | |
# ----------------- | |
# Model and dataset | |
# ----------------- | |
# | |
# Draw samples from an MVN, then sort them. This way we can easily visually | |
# verify the correct partition ends up on the correct GPUs. | |
ndim = 3 | |
def model(): | |
Root = tfd.JointDistributionCoroutine.Root | |
loc = yield Root(tfb.Shift(.5)(tfd.MultivariateNormalDiag(loc=tf.zeros([ndim])))) | |
scale_tril = yield Root(tfb.FillScaleTriL()(tfd.MultivariateNormalDiag(loc=tf.zeros([ndim * (ndim + 1) // 2])))) | |
yield tfd.MultivariateNormalTriL(loc=loc, scale_tril=scale_tril) | |
dist = tfd.JointDistributionCoroutine(model) | |
tf.random.set_seed(1) | |
loc, scale_tril, _ = dist.sample(seed=2) | |
samples = dist.sample(value=([loc] * 1024, scale_tril, None), seed=3)[2] | |
samples = tf.round(samples * 1000) / 1000 | |
for dim in reversed(range(ndim)): | |
samples = tf.gather(samples, tf.argsort(samples[:,dim])) | |
# print(len(samples)) | |
# print(loc) | |
# print(scale_tril) | |
# print(tf.reduce_mean(samples, 0)) | |
# sys.exit() | |
def dataset_fn(ctx): | |
batch_size = ctx.get_per_replica_batch_size(len(samples)) | |
d = tf.data.Dataset.from_tensor_slices(samples).batch(batch_size) | |
return d.shard(ctx.num_input_pipelines, ctx.input_pipeline_id) | |
ds = st.experimental_distribute_datasets_from_function(dataset_fn) | |
# Sanity check playing with dataset from function | |
# Toy 1 | |
def replica_fn(arg): | |
return tf.reduce_mean(arg, 0) | |
for batch in ds: | |
replica1 = st.run(replica_fn, args=(batch,)) | |
print(replica1) | |
# Toy 2 | |
# iterator = iter(ds) | |
# @tf.function(input_signature=[iterator.element_spec]) | |
# def replica_fn2(arg): | |
# return tf.reduce_mean(arg, 0) | |
# replica2 = st.run(replica_fn2, args=(next(iterator),)) | |
# print(replica2) | |
# replicas_sum = st.reduce(tf.distribute.ReduceOp.SUM, replica1) | |
# print(replicas_sum) | |
# sys.exit() | |
observations = next(iter(ds)) | |
# print(observations) | |
# @tf.function(autograph=False) | |
def log_prob_and_grad(loc, scale_tril, observations): | |
ctx = tf.distribute.get_replica_context() | |
with tf.GradientTape() as tape: | |
tape.watch((loc, scale_tril)) | |
lp = tf.reduce_sum(dist.log_prob(loc, scale_tril, observations)) / len(samples) | |
grad = tape.gradient(lp, (loc, scale_tril)) | |
return ctx.all_reduce('sum', lp), [ctx.all_reduce('sum', g) for g in grad] | |
@tf.function(autograph=False) | |
@tf.custom_gradient | |
def target_log_prob(loc, scale_tril): | |
lp, grads = st.run(log_prob_and_grad, (loc, scale_tril, observations)) | |
return lp.values[0], lambda grad_lp: [grad_lp * g.values[0] for g in grads] | |
singleton_vals = tfp.math.value_and_gradient(target_log_prob, (loc, scale_tril)) | |
print(singleton_vals) | |
print("*"*50) | |
sys.exit() | |
kernel = tfp.mcmc.HamiltonianMonteCarlo(target_log_prob, step_size=.35, num_leapfrog_steps=2) | |
kernel = tfp.mcmc.TransformedTransitionKernel(kernel, bijector=[tfb.Identity(), tfb.FillScaleTriL()]) | |
@tf.function(autograph=False) | |
def sample_chain(): | |
return tfp.mcmc.sample_chain( | |
num_results=200, num_burnin_steps=100, | |
current_state=[tf.ones_like(loc), tf.linalg.eye(scale_tril.shape[-1])], | |
kernel=kernel, trace_fn=lambda _, kr: kr.inner_results.is_accepted) | |
samps, is_accepted = sample_chain() | |
print(f'accept rate: {np.mean(is_accepted)}') | |
print(f'ess: {tfp.mcmc.effective_sample_size(samps)}') | |
print(tf.reduce_mean(samps[0], axis=0)) | |
print(tf.reduce_mean(samps[1], axis=0)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment