Skip to content

Instantly share code, notes, and snippets.

@AymenFJA
Last active April 23, 2020 02:30
Show Gist options
  • Save AymenFJA/3ef7caf69b261ee16afa2b7c7f5e9db8 to your computer and use it in GitHub Desktop.
Save AymenFJA/3ef7caf69b261ee16afa2b7c7f5e9db8 to your computer and use it in GitHub Desktop.
import shutil
import argparse
import os, os.path
from mpi4py import MPI
from queue import Queue
import iwp_divideimg as divide
parser = argparse.ArgumentParser()
parser.add_argument('imgs_path', help='Path of the dataset')
parser.add_argument('worker_root', help='main work dir')
parser.add_argument('crop_size', type= int, help='Size of divided image')
args = parser.parse_args()
class Work(object):
#put all work in queue to consume
def __init__(self, files):
# importat: sort by file size in decreasing order!
files.sort(key=lambda f: os.stat(f).st_size, reverse=True)
q = Queue()
for f in files:
q.put(f)
self.work = q
def get_next(self):
if self.work.empty():
return None
return self.work.get()
WORKTAG = 1
DIETAG = 0
def processing_img(rank,name,work):
worker_root = str(args.worker_root)
crop_size = args.crop_size
worker_img_root = os.path.join(worker_root,"input_img")
worker_divided_img_root = os.path.join(worker_root,"divided_img")
worker_output_shp_root = os.path.join(worker_root,"output_shp")
worker_finaloutput_root = os.path.join(worker_root,"final_shp")
# path to the whole image in the worker node
print ("Start processing image: ", work)
input_img_name = work.split('/')[-1]
input_img_path = os.path.join(worker_img_root,input_img_name)
# path in the module
POLYGON_DIR = worker_root
""" Create subfolder for each image
"""
worker_divided_img_subroot = os.path.join(worker_divided_img_root, input_img_name.split('.tif')[0])
worker_output_shp_subroot = os.path.join(worker_output_shp_root, input_img_name.split('.tif')[0])
worker_finaloutput_subroot = os.path.join(worker_finaloutput_root, input_img_name.split('.tif')[0])
try:
# shutil.rmtree(worker_img_root)
shutil.rmtree(worker_divided_img_subroot+'/')
shutil.rmtree(worker_output_shp_subroot+'/')
shutil.rmtree(worker_finaloutput_subroot+'/')
except:
# check local storage for temporary storage
# os.mkdir(worker_img_root)
os.mkdir(worker_divided_img_subroo)
os.mkdir(worker_output_shp_subroot)
os.mkdir(worker_finaloutput_subroot)
x_resolution, y_resolution = divide.divide_image(input_img_path,
worker_divided_img_subroot,
crop_size)
return x_resolution, y_resolution, worker_divided_img_root, worker_output_shp_root, worker_finaloutput_root
def loadwork(queue, filelist):
for f in filelist:
queue.put(f)
def master(comm):
print ("Master starts working ...")
num_procs = comm.Get_size()
status = MPI.Status()
# generate work queue on master node
imgs_path = str(args.imgs_path) #r"/pylon5/ps5fp1p/wez13005/local_dir/datasets/polygon/input_img/"
imgs_path_list = [os.path.join(imgs_path,img_name) for img_name in os.listdir(imgs_path) if img_name.endswith('.tif')]
wq = Work(imgs_path_list)
# Seed the slaves, send one unit of work to each slave (rank)
for rank in range(1, num_procs):
work = wq.get_next()
comm.send(work, dest=rank, tag=WORKTAG)
# Loop over getting new work requests until there is no more work to be done
while True:
work = wq.get_next()
if not work: break
# Receive results from a slave
result = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
#process_result(result)
# Send the slave a new work unit
comm.send(work, dest=status.Get_source(), tag=WORKTAG)
# No more work to be done, receive all outstanding results from slaves
for rank in range(1, num_procs):
result = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
#process_result(result)
# Tell all the slaves to exit by sending an empty message with DIETAG
for rank in range(1, num_procs):
comm.send(0, dest=rank, tag=DIETAG)
def worker(comm):
print ("Worker starts working ...")
my_rank = comm.Get_rank()
my_name = MPI.Get_processor_name()
status = MPI.Status()
while True:
# Receive a message from the master
work = comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
# Check the tag of the received message
if status.Get_tag() == DIETAG: break
# Do the work
result = processing_img(my_rank,my_name,work)
# Send the result back
comm.send(result, dest=0, tag=0)
print(results)
def main():
comm = MPI.COMM_WORLD
my_rank = comm.Get_rank()
my_name = MPI.Get_processor_name()
# main function
comm.Barrier()
start = MPI.Wtime()
if my_rank == 0:
master(comm)
else:
worker(comm)
comm.Barrier()
end = MPI.Wtime()
print("Total time: ", end-start)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment