#!/usr/bin/env python
import numpy
import luigi
import luigi.contrib.mpi as mpi
import cPickle as pickle
from os.path import exists
import time
from datetime import datetime
from mpi_log_utils import MPILogFilter
import os
import logging
class QueryDB(luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget('query_results.pkl')
def run(self):'Querying DB')
print 'Querying DB'
time.sleep(10) # simulate a modest DB query/resultset cycle
value = numpy.random.randint(1, 33, (1))[0]
value = {'value': value}
with self.output().open('w') as outf:
pickle.dump(value, outf)'Done querying DB')
class ComputeComplexFunc(luigi.Task):
out_fname = luigi.Parameter()
out_value = luigi.Parameter()
def requires(self):
return []
def output(self):
return luigi.LocalTarget(self.out_fname)
def run(self):
logging.debug(" called")
time.sleep(numpy.random.randint(1, 33, (1))[0])
with open(self.out_fname, 'w') as outf:
msg = 'Value {} written.'.format(self.out_value)
print msg
class DefineTasks(luigi.Task):
def requires(self):
logging.debug("DefineTasks.requires() called")
tasks = []
with open('query_results.pkl', 'r') as f:
values = pickle.load(f)
for i in range(values['value']):
out_fname = 'file_{}.txt'.format(i)
tasks.append(ComputeComplexFunc(out_fname, i))
return tasks
def output(self):
return luigi.LocalTarget('DefineTasks.completed')
def run(self):'DefineTasks has completed!')
with self.output().open('w') as outf:
if __name__ == '__main__':
# setup logging
log_dir = "."
logfile = "%s/mpi_process_%s_%d_rank_%d.log" % \
(log_dir, os.uname()[1], os.getpid(), MPILogFilter.get_rank())
logging.basicConfig( \
filename=logfile, \
format='%(asctime)s: [%(name)s] (%(levelname)s) %(message)s {rank=%(mpi_rank)d}', \
logging.root.handlers[0].addFilter(MPILogFilter()) # supplies MPI properties
# do some work -- stage 1
logging.debug("creating intial task list")
tasks = [QueryDB()]
logging.debug("executing in stage 1")
logging.debug("finished executing stage 1, now defining stage 2 tasks")
# do some work -- stage 2
tasks = [DefineTasks()]
logging.debug("executing in stage 2")
logging.debug("finished executing stage 2, All done!!")
#!/bin/env python
MPI loging
import os
import logging
from mpi4py import MPI
class MPILogFilter(logging.Filter):
def get_rank():
return MPI.COMM_WORLD.Get_rank()
A logging Filter that simply adds MPI status attributes to
a LogRecord during log event processing
def filter(self, record):
Add MPI status attributes to the supplied LogRecord
record.mpi_rank = MPILogFilter.get_rank()
return True # we allow the record to be processed
#PBS -P v10
#PBS -q express
#PBS -l walltime=00:10:00,mem=1GB,ncpus=32
#PBS -l wd
#PBS -me
#PBS -M [email protected]
module load python/2.7.6
module use /projects/u46/opt/modules/modulefiles
module load luigi-mpi
mpirun -n 32 python
