Code for merging OTU tables in parallel. This code was written by Daniel McDonald.
Created
October 10, 2012 16:17
-
-
Save gregcaporaso/3866662 to your computer and use it in GitHub Desktop.
Script for running merge_otu_tables.py in parallel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
__author__ = "Daniel McDonald" | |
__copyright__ = "Copyright 2011, The QIIME Project" | |
__credits__ = ["Daniel McDonald", "Greg Caporaso"] | |
__license__ = "GPL" | |
__version__ = "1.5.0-dev" | |
__maintainer__ = "Daniel McDonald" | |
__email__ = "[email protected]" | |
__status__ = "Development" | |
from os.path import basename, join | |
from time import time | |
from cogent.core.tree import TreeNode | |
from os import system | |
import os | |
class JobError(Exception): | |
pass | |
def mergetree(left, right, working_dir): | |
"""Reconstruct a tree from merge order""" | |
# decorate and infer filenames for tips | |
if not isinstance(left, TreeNode): | |
filepath = str(left[0]) | |
name = basename(filepath.split('.')[0]) | |
left = TreeNode(Name=name) | |
left.FilePath = filepath | |
left.Processed = False | |
left.PollPath = None # doesn't make sense for tips | |
left.FullCommand = None | |
left.EndTime = None | |
left.StartTime = None | |
left.TotalTime = None | |
if not isinstance(right,TreeNode): | |
filepath = str(right[0]) | |
name = basename(filepath.split('.')[0]) | |
right = TreeNode(Name=name) | |
right.FilePath = filepath | |
right.Processed = False | |
right.PollPath = None # doesn't make sense for tips | |
right.FullCommand = None | |
right.EndTime = None | |
right.StartTime = None | |
right.TotalTime = None | |
# internal node | |
name = '_'.join([left.Name, right.Name]) | |
filepath = join(working_dir,name) + '.biom' | |
merged = TreeNode(Name=name, Children=[left,right]) | |
merged.FilePath = filepath | |
merged.Processed = False | |
merged.PollPath = filepath + '.poll' | |
merged.FullCommand = None | |
merged.EndTime = None | |
merged.StartTime = None | |
merged.TotalTime = None | |
return merged | |
def mergeorder(items, working_dir): | |
"""Code taken from http://en.literateprograms.org/Merge_sort_(Python)""" | |
if len(items) < 2: | |
return items | |
middle = len(items) / 2 | |
left = mergeorder(items[:middle], working_dir) | |
right = mergeorder(items[middle:], working_dir) | |
return mergetree(left, right, working_dir) | |
def initial_nodes_to_merge(tree): | |
"""Determine what nodes are safe to process first | |
The first nodes to process are those internal nodes that have tips as | |
children | |
""" | |
to_process = set([]) | |
for n in tree.tips(): | |
sibs_are_tips = [s.istip() for s in n.siblings()] | |
if all(sibs_are_tips): | |
to_process.add(n.Parent) | |
return to_process | |
def initial_has_dependencies(tree, to_process): | |
"""All nodes that aren't processed up front stll have dependencies | |
to_process : set of nodes that are in the first round of processing | |
""" | |
has_dependencies = [] | |
for n in tree.nontips(include_self=True): | |
if n not in to_process: | |
has_dependencies.append(n) | |
return has_dependencies | |
def job_complete(node, verbose=False): | |
"""Check if the job is complete""" | |
if node.PollPath is None or node.istip(): | |
raise JobError, "Attempting to merge tip: %s" % node.Name | |
if node.Processed: | |
raise JobError, "Already processed node: %s" % node.Name | |
if os.path.exists(node.PollPath): | |
node.EndTime = time() | |
node.TotalTime = node.EndTime - node.StartTime | |
node.ExitStatus = open(node.PollPath).read().strip() | |
if node.ExitStatus != '0': | |
raise JobError, "Node %s did not complete correctly!" % node.Name | |
if verbose: | |
print "finishing %s, %f seconds" % (node.Name, node.TotalTime) | |
node.Processed = True | |
return True | |
else: | |
return False | |
def torque_job(cmd, pollpath, name, queue="memroute", mem="64gb", | |
walltime="999:00:00"): | |
"""Wrap a cmd for job submission""" | |
qsub_call = "qsub -k oe -N %s -q %s -l walltime=%s -l pvmem=%s" % (name, | |
queue, walltime, mem) | |
to_submit = 'echo "%s; echo $? > %s" | %s' % (cmd, pollpath, qsub_call) | |
return to_submit | |
def local_job(cmd, pollpath, name): | |
"""make a local job""" | |
to_submit = '%s; echo $? > %s' % (cmd, pollpath) | |
return to_submit | |
def start_job(node, python_exe_fp, merge_otus_fp, wrap_call=torque_job, submit=True): | |
"""Starts a process""" | |
strfmt = {'Python':python_exe_fp,'MergeOTUs':merge_otus_fp, | |
'Output':node.FilePath, | |
'BIOM_A':node.Children[0].FilePath, | |
'BIOM_B':node.Children[1].FilePath} | |
cmd = "%(Python)s %(MergeOTUs)s -i %(BIOM_A)s,%(BIOM_B)s -o %(Output)s" | |
wrapped = wrap_call(cmd % strfmt, node.PollPath, node.Name) | |
if submit: | |
system(wrapped) | |
node.FullCommand = wrapped | |
node.StartTime = time() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import division | |
__author__ = "Daniel McDonald" | |
__copyright__ = "Copyright 2011, The QIIME Project" | |
__credits__ = ["Daniel McDonald", "Greg Caporaso"] | |
__license__ = "GPL" | |
__version__ = "1.5.0-dev" | |
__maintainer__ = "Daniel McDonald" | |
__email__ = "[email protected]" | |
__status__ = "Development" | |
from random import choice | |
from qiime.util import make_option | |
from qiime.util import parse_command_line_parameters,\ | |
load_qiime_config, get_options_lookup, get_qiime_scripts_dir | |
from os import popen, system, makedirs, mkdir | |
from os.path import split, splitext, join | |
from subprocess import check_call, CalledProcessError | |
from qiime.util import get_tmp_filename | |
from cogent.core.tree import TreeNode | |
from time import sleep, time | |
from lib_parallel_merge_otu_tables import start_job, local_job, torque_job, \ | |
job_complete, initial_has_dependencies, initial_nodes_to_merge, \ | |
mergeorder, mergetree | |
qiime_config = load_qiime_config() | |
options_lookup = get_options_lookup() | |
script_info={} | |
script_info['brief_description']="""Parallel merge BIOM tables""" | |
script_info['script_description']="""This script works like the merge_otu_tables.py script, but is intended to make use of multicore/multiprocessor environments to perform analyses in parallel.""" | |
script_info['script_usage']=[] | |
script_info['script_usage'].append(("""Example""","""Merge the OTU tables $PWD/my_otu_table_1.biom,$PWD/my_otu_table_2.biom,$PWD/my_otu_table_3.biom,$PWD/my_otu_table_4.biom and write the resulting output table to the $PWD/merged_table/ directory. ALWAYS SPECIFY ABSOLUTE FILE PATHS (absolute path represented here as $PWD, but will generally look something like /home/ubuntu/my_analysis/).""","""%prog -i $PWD/table1.biom,$PWD/table2.biom -o $PWD/merged_table/""")) | |
script_info['output_description']="""The output consists of many files (i.e. merged_table.biom, merged_table.log and all intermediate merge tables). The .biom file contains the result of merging the individual BIOM tables. The resulting .log file contains a list of parameters passed to this script along with the output location of the resulting .txt file, the dependency hierarchy and runtime information for each individual merge.""" | |
script_info['required_options'] = [\ | |
make_option('-i','--input_fps',type='existing_filepaths', | |
help='the otu tables in biom format (comma-separated)'),\ | |
make_option('-o','--output_fp',type='new_filepath', | |
help='the output otu table filepath')] | |
script_info['optional_options'] = [\ | |
make_option('-N','--merge_otus_fp',action='store',\ | |
type='existing_filepath',help='full path to '+\ | |
'scripts/merge_otu_tables.py [default: %default]',\ | |
default=join(get_qiime_scripts_dir(),'merge_otu_tables.py')),\ | |
options_lookup['python_exe_fp'], | |
options_lookup['seconds_to_sleep'], | |
options_lookup['job_prefix']] | |
script_info['version'] = __version__ | |
def get_random_job_prefix(prefix): | |
x = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' | |
x = x + x.lower() | |
return prefix + ''.join([choice(x) for e in range(5)]) | |
def main(): | |
option_parser, opts, args =\ | |
parse_command_line_parameters(**script_info) | |
input_fps = opts.input_fps | |
python_exe_fp = opts.python_exe_fp | |
#cluster_jobs_fp = opts.cluster_jobs_fp | |
#jobs_to_start = opts.jobs_to_start | |
output_fp = opts.output_fp | |
merge_otus_fp = opts.merge_otus_fp | |
#poller_fp = opts.poller_fp | |
#retain_temp_files = opts.retain_temp_files | |
#suppress_polling = opts.suppress_polling | |
seconds_to_sleep = opts.seconds_to_sleep | |
#poll_directly = opts.poll_directly | |
verbose = opts.verbose | |
created_temp_paths = [] | |
# set the job_prefix either based on what the user passed in, | |
# or a random string beginning with MOTU | |
job_prefix = opts.job_prefix or get_random_job_prefix('MOTU') | |
# A temporary output directory is created in output_dir named | |
# job_prefix. Output files are then moved from the temporary | |
# directory to the output directory when they are complete, allowing | |
# a poller to detect when runs complete by the presence of their | |
# output files. | |
working_dir = '%s/%s' % (output_fp,job_prefix) | |
try: | |
makedirs(working_dir) | |
except OSError: | |
# # working dir already exists | |
pass | |
import os.path | |
# wrapper log output contains run details | |
log_fp = os.path.join(working_dir, 'parallel_merge_otus.log') | |
#log_fp = 'parallel_merge_otus.log' | |
if os.path.exists(log_fp): | |
raise IOError,"log file already exists!" | |
wrapper_log_output = open(log_fp, 'w') | |
wrapper_log_output.write("Parallel merge output\n\n") | |
# munge input filenames intentionally, output munge | |
#filenames = munge_filenames(input_fps) | |
#wrapper_log_output.write("Munge file mapping:\n") | |
#for m,f in zip(filenames,input_fps): | |
# wrapper_log_output.write('\t'.join([m,f])) | |
# wrapper_log_output.write('\n') | |
#wrapper_log_output.write('\n') | |
#wrapper_log_output.flush() | |
# construct the dependency tree | |
import os | |
tree = mergeorder(input_fps, working_dir) | |
if verbose: | |
print tree.asciiArt() | |
wrapper_log_output.write('Dependency tree:\n') | |
wrapper_log_output.write(tree.asciiArt()) | |
wrapper_log_output.write('\n\n') | |
wrapper_log_output.flush() | |
to_process = initial_nodes_to_merge(tree) | |
has_dependencies = initial_has_dependencies(tree, to_process) | |
# loop until the whole shabang is done | |
pending = [] # jobs that are currently running | |
while not tree.Processed: | |
# check if we have nodes to process, if so, shoot them off | |
for node in to_process: | |
start_job(node, python_exe_fp, merge_otus_fp, wrap_call=local_job) | |
wrapper_log_output.write(node.FullCommand) | |
wrapper_log_output.write('\n') | |
wrapper_log_output.flush() | |
pending.append(node) | |
to_process = set([]) | |
# check running jobs | |
current_pending = [] | |
for pending_node in pending: | |
# if we're complete, update state | |
if job_complete(pending_node): | |
wrapper_log_output.write("Node %s completed in %f seconds" % \ | |
(pending_node.Name, pending_node.TotalTime)) | |
wrapper_log_output.write('\n') | |
wrapper_log_output.flush() | |
else: | |
current_pending.append(pending_node) | |
pending = current_pending | |
# check for new jobs to add | |
current_dependencies = [] | |
for dep_node in has_dependencies: | |
# if children are satisfied, then allow for processing | |
# the logic here is odd to handle the case where an internal node | |
# has both a tip that is a child and child that is an internal node | |
children_are_complete = [(c.Processed or c.istip()) for c in dep_node.Children] | |
if all(children_are_complete): | |
to_process.add(dep_node) | |
else: | |
current_dependencies.append(dep_node) | |
has_dependencies = current_dependencies | |
sleep(seconds_to_sleep) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
__author__ = "Daniel McDonald" | |
__copyright__ = "Copyright 2011, The QIIME Project" | |
__credits__ = ["Daniel McDonald", "Greg Caporaso"] | |
__license__ = "GPL" | |
__version__ = "1.5.0-dev" | |
__maintainer__ = "Daniel McDonald" | |
__email__ = "[email protected]" | |
__status__ = "Development" | |
from cogent.util.unit_test import TestCase, main | |
from lib_parallel_merge_otu_tables import mergetree, mergeorder, \ | |
initial_nodes_to_merge, initial_has_dependencies, job_complete, \ | |
torque_job, local_job, start_job, JobError | |
import os | |
class MergeTests(TestCase): | |
def setUp(self): | |
pass | |
def test_mergetree(self): | |
"""construct a merge subtreetree with various properties set""" | |
exp = "(A,B)A_B;" | |
obs = mergetree(['A.biom'],['B.biom'],'foo') | |
self.assertEqual(obs.getNewick(escape_name=False), exp) | |
self.assertEqual(obs.Children[0].Name, 'A') | |
self.assertEqual(obs.Children[0].FilePath, 'A.biom') | |
self.assertEqual(obs.Children[0].Processed, False) | |
self.assertEqual(obs.Children[0].PollPath, None) | |
self.assertEqual(obs.Children[0].FullCommand, None) | |
self.assertEqual(obs.Children[1].Name, 'B') | |
self.assertEqual(obs.Children[1].FilePath, 'B.biom') | |
self.assertEqual(obs.Children[1].Processed, False) | |
self.assertEqual(obs.Children[1].PollPath, None) | |
self.assertEqual(obs.Children[1].FullCommand, None) | |
self.assertEqual(obs.Name, 'A_B') | |
self.assertEqual(obs.FilePath, 'foo/A_B.biom') | |
self.assertEqual(obs.Processed, False) | |
self.assertEqual(obs.PollPath, 'foo/A_B.biom.poll') | |
self.assertEqual(obs.FullCommand, None) | |
def test_mergeorder(self): | |
"""recursively build and join all the subtrees""" | |
exp = "((A,B)A_B,(C,(D,E)D_E)C_D_E)A_B_C_D_E;" | |
obs = mergeorder(['A','B','C','D','E'],'foo') | |
self.assertEqual(obs.getNewick(escape_name=False), exp) | |
def test_initial_nodes_to_merge(self): | |
"""determine the first nodes to merge""" | |
t = mergeorder(['A','B','C','D','E'],'foo') | |
exp = set([t.Children[0], t.Children[1].Children[1]]) | |
obs = initial_nodes_to_merge(t) | |
self.assertEqual(obs,exp) | |
def test_initial_has_dependencies(self): | |
"""determine initial has_dependencies""" | |
t = mergeorder(['A','B','C','D','E'],'foo') | |
exp = [t,t.Children[1]] | |
obs = initial_has_dependencies(t, initial_nodes_to_merge(t)) | |
self.assertEqual(obs, exp) | |
def test_job_complete(self): | |
"""check if a job is complete""" | |
t = mergeorder(['A','B','C','D','E'],'foo') | |
self.assertFalse(job_complete(t)) | |
self.assertFalse(job_complete(t.Children[0])) | |
self.assertFalse(job_complete(t.Children[1].Children[1])) | |
self.assertRaises(JobError, job_complete, t.Children[0].Children[0]) | |
f = 'test_parallel_merge_otus_JOB_COMPLETE_TEST.poll' | |
self.assertFalse(os.path.exists(f)) | |
testf = open(f,'w') | |
testf.write('0\n') | |
testf.close() | |
t.PollPath = f | |
t.StartTime = 10 | |
self.assertTrue(job_complete(t)) | |
self.assertNotEqual(t.EndTime, None) | |
self.assertNotEqual(t.TotalTime, None) | |
testf = open(f,'w') | |
testf.write('1\n') | |
testf.close() | |
self.assertRaises(JobError, job_complete, t) | |
t.Processed = False | |
self.assertRaises(JobError, job_complete, t) | |
os.remove(f) | |
def test_torque_job(self): | |
"""wrap a torque job""" | |
exp = 'echo "abc; echo $? > xyz" | qsub -k oe -N 123 -q memroute -l walltime=999:00:00 -l pvmem=64gb' | |
obs = torque_job('abc','xyz','123') | |
self.assertEqual(obs,exp) | |
def test_start_job(self): | |
"""start a job""" | |
exp = 'echo "x y -i A.biom,B.biom -o foo/A_B.biom; echo $? > foo/A_B.biom.poll" | qsub -k oe -N A_B -q memroute -l walltime=999:00:00 -l pvmem=64gb' | |
t = mergeorder(['A.biom','B.biom','C','D','E'],'foo') | |
start_job(t.Children[0], 'x','y',torque_job,False) | |
self.assertEqual(t.Children[0].FullCommand, exp) | |
def test_local_job(self): | |
"""fire off a local job""" | |
exp = "abc; echo $? > xyz" | |
obs = local_job('abc','xyz','notused') | |
self.assertEqual(obs,exp) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment