Last active
August 4, 2016 06:14
-
-
Save sixy6e/47e77aad0b3c3d5fefb2 to your computer and use it in GitHub Desktop.
luigi mpi mock
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import numpy | |
import luigi | |
import luigi.contrib.mpi as mpi | |
import cPickle as pickle | |
from os.path import exists | |
import time | |
from datetime import datetime | |
class QueryDB(luigi.Task): | |
def requires(self): | |
return [] | |
def output(self): | |
return luigi.LocalTarget('query_results.pkl') | |
def run(self): | |
print 'Querying DB' | |
value = numpy.random.randint(1, 33, (1))[0] | |
value = {'value': value} | |
with self.output().open('w') as outf: | |
pickle.dump(value, outf) | |
class ComputeComplexFunc(luigi.Task): | |
out_fname = luigi.Parameter() | |
out_value = luigi.Parameter() | |
def requires(self): | |
return [] | |
def output(self): | |
return luigi.LocalTarget(self.out_fname) | |
def run(self): | |
time.sleep(numpy.random.randint(1, 33, (1))[0]) | |
with open(self.out_fname, 'w') as outf: | |
outf.write(bytes(datetime.now())) | |
outf.write(bytes(self.out_value)) | |
print 'Value {} written.'.format(self.out_value) | |
class DefineTasks(luigi.Task): | |
def requires(self): | |
tasks = [] | |
with open('query_results.pkl', 'r') as f: | |
values = pickle.load(f) | |
for i in range(values['value']): | |
out_fname = 'file_{}.txt'.format(i) | |
tasks.append(ComputeComplexFunc(out_fname, i)) | |
return tasks | |
def output(self): | |
return luigi.LocalTarget('DefineTasks.completed') | |
def run(self): | |
print 'DefineTasks has completed!' | |
with self.output().open('w') as outf: | |
outf.write('Completed') | |
if __name__ == '__main__': | |
tasks = [QueryDB()] | |
mpi.run(tasks) | |
tasks = [DefineTasks()] | |
mpi.run(tasks) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
#PBS -P v10 | |
#PBS -q express | |
#PBS -l walltime=00:10:00,mem=1GB,ncpus=32 | |
#PBS -l wd | |
#PBS -me | |
#PBS -M [email protected] | |
module load python/2.7.6 | |
module use /projects/u46/opt/modules/modulefiles | |
module load luigi-mpi | |
mpirun -n 32 python mock_workflow.py |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I've played with this and it looks really cool. I forked the gist to do some experiments. See: https://gist.github.com/smr547/52212f84421d4f509e62
I've added a
sleep(10)
to theQueryDB.run()
method to simulate the delay experienced in a real DB query and also added some detailed logging to per-process log files.The resulting logs showed that all processes (except the one executing the query) will complete the first
mpi.run()
call and enter the secondmpi.run()
call before the query is complete and before it's results are available in the pickle file.At first look, this might seem bad!! However, all processes (slaves) synchronise with the Master and pause until the query is complete. This is fortunate, but not what I was expecting.
The following log extract is from slave 29, around the time while the query is being executed by slave 16. Note the 10 second delay while
Slave 29 waiting to sync with Master