For basic applications, MPI is as easy to use as any other message-passing system. The sample scripts below contain the complete communications skeleton for a data (or embarrassingly) parallel problem using the mpi4py package.
Within the code is a description of the few functions needed to write typical parallel applications.
mpi-submit.py - Parallel application with simple partitioning: unbalanced load.
mpi-submit2.py - Parallel application with master/slave scheme: dynamically balanced load.
jobs.sh - Example of PBS script to run the parallel jobs on HPC.
All the code is available here: https://github.com/fspaolo/mpisubmit
Script mpi-submit.py
:
#!/usr/bin/env python
"""
Parallel application with simple partitioning (unbalanced load)
"""
# Fernando Paolo <[email protected]>
# Jan 15, 2013
import os
import sys
import numpy as np
from mpi4py import MPI
def simple_partitioning(length, num_procs):
sublengths = [length/num_procs]*num_procs
for i in range(length % num_procs): # treatment of remainder
sublengths[i] += 1
return sublengths
def get_subproblem_input_args(input_args, my_rank, num_procs):
sub_ns = simple_partitioning(len(input_args), num_procs)
my_offset = sum(sub_ns[:my_rank])
my_input_args = input_args[my_offset:my_offset+sub_ns[my_rank]]
return my_input_args
def program_to_run(string):
if '.py' in string:
run = 'python '
else:
run = '' # './'
return run
comm = MPI.COMM_WORLD
my_rank = comm.Get_rank()
num_procs = comm.Get_size()
prog_and_args = sys.argv[1]
files_in = sys.argv[2:]
run = program_to_run(prog_and_args)
my_files = get_subproblem_input_args(files_in, my_rank, num_procs)
os.system('%s%s %s' % (run, prog_and_args, ' '.join(my_files)))
print '%s%s %s' % (run, prog_and_args, ' '.join(my_files))
Script mpi-submit2.py
:
#!/usr/bin/env python
"""
For basic applications, MPI is as easy to use as any other
message-passing system. The sample code below contains the complete
communications skeleton for a dynamically load balanced master/slave
application. Following the code is a description of the few functions
necessary to write typical parallel applications.
important parameters
--------------------
status = MPI.Status() # where all info is stored
# Receive results from a slave
result = comm.recv( # message buffer
source=MPI.ANY_SOURCE, # receive from any sender (-1)
tag=MPI.ANY_TAG, # any type of message (-1)
status=status) # info about the received msg (class)
# Send the slave a new work unit
comm.send(work, # message buffer
dest=status.Get_source(), # to whom we just received from
tag=WORKTAG) # user chosen message tag
"""
# Fernandoo Paolo <[email protected]>
# Jan 15, 2013
import os
import sys
import numpy as np
from mpi4py import MPI
from Queue import Queue
WORKTAG = 1
DIETAG = 0
class Work(object):
def __init__(self, prog, files):
# importat: sort by file size in decreasing order!
files.sort(key=lambda f: os.stat(f).st_size, reverse=True)
q = Queue()
for f in files:
q.put(' '.join([prog, f]))
self.work = q
def get_next(self):
if self.work.empty():
return None
return self.work.get()
def do_work(work):
if '.py' in work:
os.system('python ' + work)
else:
os.system(work) # for './'
return
def process_result(result):
pass
def master(comm):
num_procs = comm.Get_size()
status = MPI.Status()
# generate work queue
wq = Work(sys.argv[1], sys.argv[2:])
# Seed the slaves, send one unit of work to each slave (rank)
for rank in xrange(1, num_procs):
work = wq.get_next()
comm.send(work, dest=rank, tag=WORKTAG)
# Loop over getting new work requests until there is no more work to be done
while True:
work = wq.get_next()
if not work: break
# Receive results from a slave
result = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
#process_result(result)
# Send the slave a new work unit
comm.send(work, dest=status.Get_source(), tag=WORKTAG)
# No more work to be done, receive all outstanding results from slaves
for rank in xrange(1, num_procs):
result = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
#process_result(result)
# Tell all the slaves to exit by sending an empty message with DIETAG
for rank in xrange(1, num_procs):
comm.send(0, dest=rank, tag=DIETAG)
def slave(comm):
my_rank = comm.Get_rank()
status = MPI.Status()
while True:
# Receive a message from the master
work = comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
# Check the tag of the received message
if status.Get_tag() == DIETAG: break
# Do the work
result = do_work(work)
# Send the result back
comm.send(result, dest=0, tag=0)
def main():
comm = MPI.COMM_WORLD
my_rank = comm.Get_rank()
my_name = MPI.Get_processor_name()
#comm.Barrier()
#start = MPI.Wtime()
if my_rank == 0:
master(comm)
else:
slave(comm)
#comm.Barrier()
#end = MPI.Wtime()
#print 'time:', end - start
if __name__ == '__main__':
main()
Script jobs.sh
:
#!/bin/bash
#PBS -A fpaolo # account name
#PBS -q hotel # queue name
#PBS -N test # job name
#PBS -o test.out # output file name
#PBS -e test.err # error file name
#PBS -l nodes=1:ppn=2 # nodes, process-per-node
#PBS -l walltime=0:00:10 # requested time h:mm:ss
#PBS -M [email protected] # email to receive messages
#PBS -m abe # message when start and stop
#PBS -V # verbose
# print information
echo 'PBS_JOBID:' $PBS_JOBID
echo 'PBS_O_WORKDIR:' $PBS_O_WORKDIR
echo 'PBS_O_QUEUE:' $PBS_O_QUEUE
echo 'PBS_NODEFILE:' $PBS_NODEFILE
# Change to current workdir
cd $PBS_O_WORKDIR
# run programs
mpiexec -v -machinefile $PBS_NODEFILE python -c "print 'hello world'"
#mpiexec -v -machinefile $PBS_NODEFILE python mpi-submit.py '/path/to/prog.py' ~/path/to/data/file.*
#
# notes
# -----
# mpi-submit.py - distributes the workload => map(prog.py, [f1, f2, ...])
# prog.py - program to run on each file independently
# file.* - data files to process in parallel