-
-
Save smr547/52212f84421d4f509e62 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import numpy | |
import luigi | |
import luigi.contrib.mpi as mpi | |
import cPickle as pickle | |
from os.path import exists | |
import time | |
from datetime import datetime | |
from mpi_log_utils import MPILogFilter | |
import os | |
import logging | |
class QueryDB(luigi.Task): | |
def requires(self): | |
return [] | |
def output(self): | |
return luigi.LocalTarget('query_results.pkl') | |
def run(self): | |
logging.info('Querying DB') | |
print 'Querying DB' | |
time.sleep(10) # simulate a modest DB query/resultset cycle | |
value = numpy.random.randint(1, 33, (1))[0] | |
value = {'value': value} | |
with self.output().open('w') as outf: | |
pickle.dump(value, outf) | |
logging.info('Done querying DB') | |
class ComputeComplexFunc(luigi.Task): | |
out_fname = luigi.Parameter() | |
out_value = luigi.Parameter() | |
def requires(self): | |
return [] | |
def output(self): | |
return luigi.LocalTarget(self.out_fname) | |
def run(self): | |
logging.debug("ComputeComplexFunc.run() called") | |
time.sleep(numpy.random.randint(1, 33, (1))[0]) | |
with open(self.out_fname, 'w') as outf: | |
outf.write(bytes(datetime.now())) | |
outf.write(bytes(self.out_value)) | |
msg = 'Value {} written.'.format(self.out_value) | |
logging.info(msg) | |
print msg | |
class DefineTasks(luigi.Task): | |
def requires(self): | |
logging.debug("DefineTasks.requires() called") | |
tasks = [] | |
with open('query_results.pkl', 'r') as f: | |
values = pickle.load(f) | |
for i in range(values['value']): | |
out_fname = 'file_{}.txt'.format(i) | |
tasks.append(ComputeComplexFunc(out_fname, i)) | |
return tasks | |
def output(self): | |
return luigi.LocalTarget('DefineTasks.completed') | |
def run(self): | |
logging.info('DefineTasks has completed!') | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
if __name__ == '__main__': | |
# setup logging | |
log_dir = "." | |
logfile = "%s/mpi_process_%s_%d_rank_%d.log" % \ | |
(log_dir, os.uname()[1], os.getpid(), MPILogFilter.get_rank()) | |
logging.basicConfig( \ | |
filename=logfile, \ | |
format='%(asctime)s: [%(name)s] (%(levelname)s) %(message)s {rank=%(mpi_rank)d}', \ | |
level=logging.DEBUG) | |
logging.root.handlers[0].addFilter(MPILogFilter()) # supplies MPI properties | |
# do some work -- stage 1 | |
logging.debug("creating intial task list") | |
tasks = [QueryDB()] | |
logging.debug("executing mpi.run() in stage 1") | |
mpi.run(tasks) | |
logging.debug("finished executing mpi.run() stage 1, now defining stage 2 tasks") | |
# do some work -- stage 2 | |
tasks = [DefineTasks()] | |
logging.debug("executing mpi.run() in stage 2") | |
mpi.run(tasks) | |
logging.debug("finished executing mpi.run() stage 2, All done!!") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python | |
""" | |
MPI loging | |
""" | |
import os | |
import logging | |
from mpi4py import MPI | |
class MPILogFilter(logging.Filter): | |
@staticmethod | |
def get_rank(): | |
return MPI.COMM_WORLD.Get_rank() | |
""" | |
A logging Filter that simply adds MPI status attributes to | |
a LogRecord during log event processing | |
""" | |
def filter(self, record): | |
""" | |
Add MPI status attributes to the supplied LogRecord | |
""" | |
record.mpi_rank = MPILogFilter.get_rank() | |
return True # we allow the record to be processed |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
#PBS -P v10 | |
#PBS -q express | |
#PBS -l walltime=00:10:00,mem=1GB,ncpus=32 | |
#PBS -l wd | |
#PBS -me | |
#PBS -M [email protected] | |
module load python/2.7.6 | |
module use /projects/u46/opt/modules/modulefiles | |
module load luigi-mpi | |
mpirun -n 32 python mock_workflow.py |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment