-
-
Save sixy6e/47e77aad0b3c3d5fefb2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
import numpy | |
import luigi | |
import luigi.contrib.mpi as mpi | |
import cPickle as pickle | |
from os.path import exists | |
import time | |
from datetime import datetime | |
class QueryDB(luigi.Task): | |
def requires(self): | |
return [] | |
def output(self): | |
return luigi.LocalTarget('query_results.pkl') | |
def run(self): | |
print 'Querying DB' | |
value = numpy.random.randint(1, 33, (1))[0] | |
value = {'value': value} | |
with self.output().open('w') as outf: | |
pickle.dump(value, outf) | |
class ComputeComplexFunc(luigi.Task): | |
out_fname = luigi.Parameter() | |
out_value = luigi.Parameter() | |
def requires(self): | |
return [] | |
def output(self): | |
return luigi.LocalTarget(self.out_fname) | |
def run(self): | |
time.sleep(numpy.random.randint(1, 33, (1))[0]) | |
with open(self.out_fname, 'w') as outf: | |
outf.write(bytes(datetime.now())) | |
outf.write(bytes(self.out_value)) | |
print 'Value {} written.'.format(self.out_value) | |
class DefineTasks(luigi.Task): | |
def requires(self): | |
tasks = [] | |
with open('query_results.pkl', 'r') as f: | |
values = pickle.load(f) | |
for i in range(values['value']): | |
out_fname = 'file_{}.txt'.format(i) | |
tasks.append(ComputeComplexFunc(out_fname, i)) | |
return tasks | |
def output(self): | |
return luigi.LocalTarget('DefineTasks.completed') | |
def run(self): | |
print 'DefineTasks has completed!' | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
if __name__ == '__main__': | |
tasks = [QueryDB()] | |
mpi.run(tasks) | |
tasks = [DefineTasks()] | |
mpi.run(tasks) |
#!/bin/bash | |
#PBS -P v10 | |
#PBS -q express | |
#PBS -l walltime=00:10:00,mem=1GB,ncpus=32 | |
#PBS -l wd | |
#PBS -me | |
#PBS -M [email protected] | |
module load python/2.7.6 | |
module use /projects/u46/opt/modules/modulefiles | |
module load luigi-mpi | |
mpirun -n 32 python mock_workflow.py |
I've played with this and it looks really cool. I forked the gist to do some experiments. See: https://gist.github.com/smr547/52212f84421d4f509e62
I've added a sleep(10)
to the QueryDB.run()
method to simulate the delay experienced in a real DB query and also added some detailed logging to per-process log files.
The resulting logs showed that all processes (except the one executing the query) will complete the first mpi.run()
call and enter the second mpi.run()
call before the query is complete and before it's results are available in the pickle file.
At first look, this might seem bad!! However, all processes (slaves) synchronise with the Master and pause until the query is complete. This is fortunate, but not what I was expecting.
The following log extract is from slave 29, around the time while the query is being executed by slave 16. Note the 10 second delay while Slave 29 waiting to sync with Master
...
2015-04-10 16:39:34,622: [luigi-interface] (INFO) There are no more tasks to run at this time {rank=29}
2015-04-10 16:39:34,622: [luigi-interface] (INFO) QueryDB() is currently run by worker Worker(salt=715810340, workers=1, host=r80, username=smr547, pid=12316, rank=16) {rank=29}
2015-04-10 16:39:34,622: [root] (DEBUG) finished executing mpi.run() stage 1, now defining stage 2 tasks {rank=29}
2015-04-10 16:39:34,623: [root] (DEBUG) executing mpi.run() in stage 2 {rank=29}
2015-04-10 16:39:34,623: [luigi-interface] (DEBUG) Slave 29 waiting to sync with Master {rank=29}
2015-04-10 16:39:44,644: [luigi-interface] (DEBUG) Slave 29 locally updated task status. {rank=29}
2015-04-10 16:39:44,645: [luigi-interface] (DEBUG) Checking if DefineTasks() is complete {rank=29}
2015-04-10 16:39:44,646: [root] (DEBUG) DefineTasks.requires() called {rank=29}
...
A little toy example where the number of files to be produced is unknown until the query has been run.