Last active
December 28, 2023 10:29
-
-
Save bnlawrence/83d64fc401e8f24deb6a53feb6f6fc9d to your computer and use it in GitHub Desktop.
example of using mpi alltoall and alltoallv
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mpi4py import MPI | |
import numpy as np | |
comm = MPI.COMM_WORLD | |
rank = comm.Get_rank() | |
nprocs = comm.Get_size() | |
DEBUG = 0 | |
def move_info(info2move): | |
""" | |
Send a list of arrays of integers to all ranks, | |
and receive arrays from all ranks. | |
""" | |
print('moving') | |
buffer = np.concatenate(info2move) | |
if DEBUG: | |
print('S', rank, [p for p in info2move]) | |
n_sending = len(buffer) | |
count = np.array([len(x) for x in info2move]) | |
displ = np.array([sum(count[:p]) for p in range(len(info2move))]) | |
if DEBUG: | |
print(f'r{rank}, c{count}, d{displ}') | |
print(f'On rank {rank} sending {count}') | |
# send my count to all processes | |
values = comm.alltoall(count) | |
print(f'On rank {rank} have got {values}') | |
n_receiving = sum(values) | |
print(f'Sending {n_sending} and receiving {n_receiving} on rank {rank}') | |
# now all processes know how much data they will get, | |
# and how much from each rank | |
r_buffer = np.zeros(n_receiving, dtype=np.uint32) | |
rdisp = np.array([sum(values[:p]) for p in range(len(values))]) | |
comm.Alltoallv([buffer, count, displ, MPI.UINT32_T], | |
[r_buffer, values, rdisp, MPI.UINT32_T]) | |
if DEBUG: | |
# surely there is a better way to do this: | |
slices = [(rdisp[i], rdisp[i] + values[i]) for i in range(len(values))] | |
results = [r_buffer[s:e] for s, e in slices] | |
# (but remember we only need to do it for this check, it's not needed | |
# in real life.) | |
print('R', rank, [p for p in results]) | |
# this should not have changed | |
assert np.all(info2move[rank][0:2] == results[rank][0:2]),f'Assertion failure for rank {rank}' | |
return r_buffer | |
def main(): | |
""" Toy example of establishing domains of interest within | |
a larger array and handing information between domains | |
""" | |
# let's create integers and distribute them between domains | |
n_all = 100 | |
if rank == 0: | |
people = np.arange(n_all, dtype=np.uint32) | |
# give a domain to each pe ... | |
a, r = divmod(n_all, nprocs) | |
count = np.array([a+1 if p < r else a for p in range(nprocs)]) | |
displ = np.array([sum(count[:p]) for p in range(nprocs)]) | |
else: | |
people = None | |
# initialize count on other processes | |
count = np.zeros(nprocs, dtype=np.int) | |
displ = None | |
# broadcast count | |
comm.Bcast(count, root=0) | |
# initialize domain on all processes | |
domain = np.zeros(count[rank],dtype=np.uint32) | |
comm.Scatterv([people, count, displ, MPI.UINT32_T], domain, root=0) | |
print(f'Domain {rank} has {len(domain)} people') | |
# Now let's pretend we want to send some information between domains | |
# and that information is just a bunch of integers. | |
# we want to send about 10% of the total number of integers. | |
# ... let's just proxy that by generating 2.5% in each domain, | |
# then sending that back (assuming we have 4 ranks for this toy test, so | |
if n_all < 1000: | |
i_percent = .25 | |
else: | |
i_percent = 0.025 | |
# Crucial that it is not the same number of people for everyone | |
# otherwise we'd not need to use scatterv. | |
# We use a list of numpy arrays for best match to our application usage. | |
totransfer = [] | |
for i in range(nprocs): | |
n2move = int(i_percent * len(domain) * np.random.uniform(0.9,1.1)) | |
totransfer.append(np.random.randint(0, len(domain), n2move, np.uint32)) | |
result = move_info(totransfer) | |
if __name__=="__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment