Last active
April 23, 2020 02:30
-
-
Save AymenFJA/3ef7caf69b261ee16afa2b7c7f5e9db8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import shutil | |
import argparse | |
import os, os.path | |
from mpi4py import MPI | |
from queue import Queue | |
import iwp_divideimg as divide | |
parser = argparse.ArgumentParser() | |
parser.add_argument('imgs_path', help='Path of the dataset') | |
parser.add_argument('worker_root', help='main work dir') | |
parser.add_argument('crop_size', type= int, help='Size of divided image') | |
args = parser.parse_args() | |
class Work(object): | |
#put all work in queue to consume | |
def __init__(self, files): | |
# importat: sort by file size in decreasing order! | |
files.sort(key=lambda f: os.stat(f).st_size, reverse=True) | |
q = Queue() | |
for f in files: | |
q.put(f) | |
self.work = q | |
def get_next(self): | |
if self.work.empty(): | |
return None | |
return self.work.get() | |
WORKTAG = 1 | |
DIETAG = 0 | |
def processing_img(rank,name,work): | |
worker_root = str(args.worker_root) | |
crop_size = args.crop_size | |
worker_img_root = os.path.join(worker_root,"input_img") | |
worker_divided_img_root = os.path.join(worker_root,"divided_img") | |
worker_output_shp_root = os.path.join(worker_root,"output_shp") | |
worker_finaloutput_root = os.path.join(worker_root,"final_shp") | |
# path to the whole image in the worker node | |
print ("Start processing image: ", work) | |
input_img_name = work.split('/')[-1] | |
input_img_path = os.path.join(worker_img_root,input_img_name) | |
# path in the module | |
POLYGON_DIR = worker_root | |
""" Create subfolder for each image | |
""" | |
worker_divided_img_subroot = os.path.join(worker_divided_img_root, input_img_name.split('.tif')[0]) | |
worker_output_shp_subroot = os.path.join(worker_output_shp_root, input_img_name.split('.tif')[0]) | |
worker_finaloutput_subroot = os.path.join(worker_finaloutput_root, input_img_name.split('.tif')[0]) | |
try: | |
# shutil.rmtree(worker_img_root) | |
shutil.rmtree(worker_divided_img_subroot+'/') | |
shutil.rmtree(worker_output_shp_subroot+'/') | |
shutil.rmtree(worker_finaloutput_subroot+'/') | |
except: | |
# check local storage for temporary storage | |
# os.mkdir(worker_img_root) | |
os.mkdir(worker_divided_img_subroo) | |
os.mkdir(worker_output_shp_subroot) | |
os.mkdir(worker_finaloutput_subroot) | |
x_resolution, y_resolution = divide.divide_image(input_img_path, | |
worker_divided_img_subroot, | |
crop_size) | |
return x_resolution, y_resolution, worker_divided_img_root, worker_output_shp_root, worker_finaloutput_root | |
def loadwork(queue, filelist): | |
for f in filelist: | |
queue.put(f) | |
def master(comm): | |
print ("Master starts working ...") | |
num_procs = comm.Get_size() | |
status = MPI.Status() | |
# generate work queue on master node | |
imgs_path = str(args.imgs_path) #r"/pylon5/ps5fp1p/wez13005/local_dir/datasets/polygon/input_img/" | |
imgs_path_list = [os.path.join(imgs_path,img_name) for img_name in os.listdir(imgs_path) if img_name.endswith('.tif')] | |
wq = Work(imgs_path_list) | |
# Seed the slaves, send one unit of work to each slave (rank) | |
for rank in range(1, num_procs): | |
work = wq.get_next() | |
comm.send(work, dest=rank, tag=WORKTAG) | |
# Loop over getting new work requests until there is no more work to be done | |
while True: | |
work = wq.get_next() | |
if not work: break | |
# Receive results from a slave | |
result = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) | |
#process_result(result) | |
# Send the slave a new work unit | |
comm.send(work, dest=status.Get_source(), tag=WORKTAG) | |
# No more work to be done, receive all outstanding results from slaves | |
for rank in range(1, num_procs): | |
result = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) | |
#process_result(result) | |
# Tell all the slaves to exit by sending an empty message with DIETAG | |
for rank in range(1, num_procs): | |
comm.send(0, dest=rank, tag=DIETAG) | |
def worker(comm): | |
print ("Worker starts working ...") | |
my_rank = comm.Get_rank() | |
my_name = MPI.Get_processor_name() | |
status = MPI.Status() | |
while True: | |
# Receive a message from the master | |
work = comm.recv(source=0, tag=MPI.ANY_TAG, status=status) | |
# Check the tag of the received message | |
if status.Get_tag() == DIETAG: break | |
# Do the work | |
result = processing_img(my_rank,my_name,work) | |
# Send the result back | |
comm.send(result, dest=0, tag=0) | |
print(results) | |
def main(): | |
comm = MPI.COMM_WORLD | |
my_rank = comm.Get_rank() | |
my_name = MPI.Get_processor_name() | |
# main function | |
comm.Barrier() | |
start = MPI.Wtime() | |
if my_rank == 0: | |
master(comm) | |
else: | |
worker(comm) | |
comm.Barrier() | |
end = MPI.Wtime() | |
print("Total time: ", end-start) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment