Skip to content

Instantly share code, notes, and snippets.

@bnlawrence
Last active December 28, 2023 10:29
Show Gist options
  • Save bnlawrence/83d64fc401e8f24deb6a53feb6f6fc9d to your computer and use it in GitHub Desktop.
Save bnlawrence/83d64fc401e8f24deb6a53feb6f6fc9d to your computer and use it in GitHub Desktop.
example of using mpi alltoall and alltoallv
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
DEBUG = 0
def move_info(info2move):
"""
Send a list of arrays of integers to all ranks,
and receive arrays from all ranks.
"""
print('moving')
buffer = np.concatenate(info2move)
if DEBUG:
print('S', rank, [p for p in info2move])
n_sending = len(buffer)
count = np.array([len(x) for x in info2move])
displ = np.array([sum(count[:p]) for p in range(len(info2move))])
if DEBUG:
print(f'r{rank}, c{count}, d{displ}')
print(f'On rank {rank} sending {count}')
# send my count to all processes
values = comm.alltoall(count)
print(f'On rank {rank} have got {values}')
n_receiving = sum(values)
print(f'Sending {n_sending} and receiving {n_receiving} on rank {rank}')
# now all processes know how much data they will get,
# and how much from each rank
r_buffer = np.zeros(n_receiving, dtype=np.uint32)
rdisp = np.array([sum(values[:p]) for p in range(len(values))])
comm.Alltoallv([buffer, count, displ, MPI.UINT32_T],
[r_buffer, values, rdisp, MPI.UINT32_T])
if DEBUG:
# surely there is a better way to do this:
slices = [(rdisp[i], rdisp[i] + values[i]) for i in range(len(values))]
results = [r_buffer[s:e] for s, e in slices]
# (but remember we only need to do it for this check, it's not needed
# in real life.)
print('R', rank, [p for p in results])
# this should not have changed
assert np.all(info2move[rank][0:2] == results[rank][0:2]),f'Assertion failure for rank {rank}'
return r_buffer
def main():
""" Toy example of establishing domains of interest within
a larger array and handing information between domains
"""
# let's create integers and distribute them between domains
n_all = 100
if rank == 0:
people = np.arange(n_all, dtype=np.uint32)
# give a domain to each pe ...
a, r = divmod(n_all, nprocs)
count = np.array([a+1 if p < r else a for p in range(nprocs)])
displ = np.array([sum(count[:p]) for p in range(nprocs)])
else:
people = None
# initialize count on other processes
count = np.zeros(nprocs, dtype=np.int)
displ = None
# broadcast count
comm.Bcast(count, root=0)
# initialize domain on all processes
domain = np.zeros(count[rank],dtype=np.uint32)
comm.Scatterv([people, count, displ, MPI.UINT32_T], domain, root=0)
print(f'Domain {rank} has {len(domain)} people')
# Now let's pretend we want to send some information between domains
# and that information is just a bunch of integers.
# we want to send about 10% of the total number of integers.
# ... let's just proxy that by generating 2.5% in each domain,
# then sending that back (assuming we have 4 ranks for this toy test, so
if n_all < 1000:
i_percent = .25
else:
i_percent = 0.025
# Crucial that it is not the same number of people for everyone
# otherwise we'd not need to use scatterv.
# We use a list of numpy arrays for best match to our application usage.
totransfer = []
for i in range(nprocs):
n2move = int(i_percent * len(domain) * np.random.uniform(0.9,1.1))
totransfer.append(np.random.randint(0, len(domain), n2move, np.uint32))
result = move_info(totransfer)
if __name__=="__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment