Skip to content

Instantly share code, notes, and snippets.

@gregcaporaso
Created October 10, 2012 16:17
Show Gist options
  • Save gregcaporaso/3866662 to your computer and use it in GitHub Desktop.
Save gregcaporaso/3866662 to your computer and use it in GitHub Desktop.
Script for running merge_otu_tables.py in parallel

Code for merging OTU tables in parallel. This code was written by Daniel McDonald.

#!/usr/bin/env python
__author__ = "Daniel McDonald"
__copyright__ = "Copyright 2011, The QIIME Project"
__credits__ = ["Daniel McDonald", "Greg Caporaso"]
__license__ = "GPL"
__version__ = "1.5.0-dev"
__maintainer__ = "Daniel McDonald"
__email__ = "[email protected]"
__status__ = "Development"
from os.path import basename, join
from time import time
from cogent.core.tree import TreeNode
from os import system
import os
class JobError(Exception):
pass
def mergetree(left, right, working_dir):
"""Reconstruct a tree from merge order"""
# decorate and infer filenames for tips
if not isinstance(left, TreeNode):
filepath = str(left[0])
name = basename(filepath.split('.')[0])
left = TreeNode(Name=name)
left.FilePath = filepath
left.Processed = False
left.PollPath = None # doesn't make sense for tips
left.FullCommand = None
left.EndTime = None
left.StartTime = None
left.TotalTime = None
if not isinstance(right,TreeNode):
filepath = str(right[0])
name = basename(filepath.split('.')[0])
right = TreeNode(Name=name)
right.FilePath = filepath
right.Processed = False
right.PollPath = None # doesn't make sense for tips
right.FullCommand = None
right.EndTime = None
right.StartTime = None
right.TotalTime = None
# internal node
name = '_'.join([left.Name, right.Name])
filepath = join(working_dir,name) + '.biom'
merged = TreeNode(Name=name, Children=[left,right])
merged.FilePath = filepath
merged.Processed = False
merged.PollPath = filepath + '.poll'
merged.FullCommand = None
merged.EndTime = None
merged.StartTime = None
merged.TotalTime = None
return merged
def mergeorder(items, working_dir):
"""Code taken from http://en.literateprograms.org/Merge_sort_(Python)"""
if len(items) < 2:
return items
middle = len(items) / 2
left = mergeorder(items[:middle], working_dir)
right = mergeorder(items[middle:], working_dir)
return mergetree(left, right, working_dir)
def initial_nodes_to_merge(tree):
"""Determine what nodes are safe to process first
The first nodes to process are those internal nodes that have tips as
children
"""
to_process = set([])
for n in tree.tips():
sibs_are_tips = [s.istip() for s in n.siblings()]
if all(sibs_are_tips):
to_process.add(n.Parent)
return to_process
def initial_has_dependencies(tree, to_process):
"""All nodes that aren't processed up front stll have dependencies
to_process : set of nodes that are in the first round of processing
"""
has_dependencies = []
for n in tree.nontips(include_self=True):
if n not in to_process:
has_dependencies.append(n)
return has_dependencies
def job_complete(node, verbose=False):
"""Check if the job is complete"""
if node.PollPath is None or node.istip():
raise JobError, "Attempting to merge tip: %s" % node.Name
if node.Processed:
raise JobError, "Already processed node: %s" % node.Name
if os.path.exists(node.PollPath):
node.EndTime = time()
node.TotalTime = node.EndTime - node.StartTime
node.ExitStatus = open(node.PollPath).read().strip()
if node.ExitStatus != '0':
raise JobError, "Node %s did not complete correctly!" % node.Name
if verbose:
print "finishing %s, %f seconds" % (node.Name, node.TotalTime)
node.Processed = True
return True
else:
return False
def torque_job(cmd, pollpath, name, queue="memroute", mem="64gb",
walltime="999:00:00"):
"""Wrap a cmd for job submission"""
qsub_call = "qsub -k oe -N %s -q %s -l walltime=%s -l pvmem=%s" % (name,
queue, walltime, mem)
to_submit = 'echo "%s; echo $? > %s" | %s' % (cmd, pollpath, qsub_call)
return to_submit
def local_job(cmd, pollpath, name):
"""make a local job"""
to_submit = '%s; echo $? > %s' % (cmd, pollpath)
return to_submit
def start_job(node, python_exe_fp, merge_otus_fp, wrap_call=torque_job, submit=True):
"""Starts a process"""
strfmt = {'Python':python_exe_fp,'MergeOTUs':merge_otus_fp,
'Output':node.FilePath,
'BIOM_A':node.Children[0].FilePath,
'BIOM_B':node.Children[1].FilePath}
cmd = "%(Python)s %(MergeOTUs)s -i %(BIOM_A)s,%(BIOM_B)s -o %(Output)s"
wrapped = wrap_call(cmd % strfmt, node.PollPath, node.Name)
if submit:
system(wrapped)
node.FullCommand = wrapped
node.StartTime = time()
#!/usr/bin/env python
from __future__ import division
__author__ = "Daniel McDonald"
__copyright__ = "Copyright 2011, The QIIME Project"
__credits__ = ["Daniel McDonald", "Greg Caporaso"]
__license__ = "GPL"
__version__ = "1.5.0-dev"
__maintainer__ = "Daniel McDonald"
__email__ = "[email protected]"
__status__ = "Development"
from random import choice
from qiime.util import make_option
from qiime.util import parse_command_line_parameters,\
load_qiime_config, get_options_lookup, get_qiime_scripts_dir
from os import popen, system, makedirs, mkdir
from os.path import split, splitext, join
from subprocess import check_call, CalledProcessError
from qiime.util import get_tmp_filename
from cogent.core.tree import TreeNode
from time import sleep, time
from lib_parallel_merge_otu_tables import start_job, local_job, torque_job, \
job_complete, initial_has_dependencies, initial_nodes_to_merge, \
mergeorder, mergetree
qiime_config = load_qiime_config()
options_lookup = get_options_lookup()
script_info={}
script_info['brief_description']="""Parallel merge BIOM tables"""
script_info['script_description']="""This script works like the merge_otu_tables.py script, but is intended to make use of multicore/multiprocessor environments to perform analyses in parallel."""
script_info['script_usage']=[]
script_info['script_usage'].append(("""Example""","""Merge the OTU tables $PWD/my_otu_table_1.biom,$PWD/my_otu_table_2.biom,$PWD/my_otu_table_3.biom,$PWD/my_otu_table_4.biom and write the resulting output table to the $PWD/merged_table/ directory. ALWAYS SPECIFY ABSOLUTE FILE PATHS (absolute path represented here as $PWD, but will generally look something like /home/ubuntu/my_analysis/).""","""%prog -i $PWD/table1.biom,$PWD/table2.biom -o $PWD/merged_table/"""))
script_info['output_description']="""The output consists of many files (i.e. merged_table.biom, merged_table.log and all intermediate merge tables). The .biom file contains the result of merging the individual BIOM tables. The resulting .log file contains a list of parameters passed to this script along with the output location of the resulting .txt file, the dependency hierarchy and runtime information for each individual merge."""
script_info['required_options'] = [\
make_option('-i','--input_fps',type='existing_filepaths',
help='the otu tables in biom format (comma-separated)'),\
make_option('-o','--output_fp',type='new_filepath',
help='the output otu table filepath')]
script_info['optional_options'] = [\
make_option('-N','--merge_otus_fp',action='store',\
type='existing_filepath',help='full path to '+\
'scripts/merge_otu_tables.py [default: %default]',\
default=join(get_qiime_scripts_dir(),'merge_otu_tables.py')),\
options_lookup['python_exe_fp'],
options_lookup['seconds_to_sleep'],
options_lookup['job_prefix']]
script_info['version'] = __version__
def get_random_job_prefix(prefix):
x = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
x = x + x.lower()
return prefix + ''.join([choice(x) for e in range(5)])
def main():
option_parser, opts, args =\
parse_command_line_parameters(**script_info)
input_fps = opts.input_fps
python_exe_fp = opts.python_exe_fp
#cluster_jobs_fp = opts.cluster_jobs_fp
#jobs_to_start = opts.jobs_to_start
output_fp = opts.output_fp
merge_otus_fp = opts.merge_otus_fp
#poller_fp = opts.poller_fp
#retain_temp_files = opts.retain_temp_files
#suppress_polling = opts.suppress_polling
seconds_to_sleep = opts.seconds_to_sleep
#poll_directly = opts.poll_directly
verbose = opts.verbose
created_temp_paths = []
# set the job_prefix either based on what the user passed in,
# or a random string beginning with MOTU
job_prefix = opts.job_prefix or get_random_job_prefix('MOTU')
# A temporary output directory is created in output_dir named
# job_prefix. Output files are then moved from the temporary
# directory to the output directory when they are complete, allowing
# a poller to detect when runs complete by the presence of their
# output files.
working_dir = '%s/%s' % (output_fp,job_prefix)
try:
makedirs(working_dir)
except OSError:
# # working dir already exists
pass
import os.path
# wrapper log output contains run details
log_fp = os.path.join(working_dir, 'parallel_merge_otus.log')
#log_fp = 'parallel_merge_otus.log'
if os.path.exists(log_fp):
raise IOError,"log file already exists!"
wrapper_log_output = open(log_fp, 'w')
wrapper_log_output.write("Parallel merge output\n\n")
# munge input filenames intentionally, output munge
#filenames = munge_filenames(input_fps)
#wrapper_log_output.write("Munge file mapping:\n")
#for m,f in zip(filenames,input_fps):
# wrapper_log_output.write('\t'.join([m,f]))
# wrapper_log_output.write('\n')
#wrapper_log_output.write('\n')
#wrapper_log_output.flush()
# construct the dependency tree
import os
tree = mergeorder(input_fps, working_dir)
if verbose:
print tree.asciiArt()
wrapper_log_output.write('Dependency tree:\n')
wrapper_log_output.write(tree.asciiArt())
wrapper_log_output.write('\n\n')
wrapper_log_output.flush()
to_process = initial_nodes_to_merge(tree)
has_dependencies = initial_has_dependencies(tree, to_process)
# loop until the whole shabang is done
pending = [] # jobs that are currently running
while not tree.Processed:
# check if we have nodes to process, if so, shoot them off
for node in to_process:
start_job(node, python_exe_fp, merge_otus_fp, wrap_call=local_job)
wrapper_log_output.write(node.FullCommand)
wrapper_log_output.write('\n')
wrapper_log_output.flush()
pending.append(node)
to_process = set([])
# check running jobs
current_pending = []
for pending_node in pending:
# if we're complete, update state
if job_complete(pending_node):
wrapper_log_output.write("Node %s completed in %f seconds" % \
(pending_node.Name, pending_node.TotalTime))
wrapper_log_output.write('\n')
wrapper_log_output.flush()
else:
current_pending.append(pending_node)
pending = current_pending
# check for new jobs to add
current_dependencies = []
for dep_node in has_dependencies:
# if children are satisfied, then allow for processing
# the logic here is odd to handle the case where an internal node
# has both a tip that is a child and child that is an internal node
children_are_complete = [(c.Processed or c.istip()) for c in dep_node.Children]
if all(children_are_complete):
to_process.add(dep_node)
else:
current_dependencies.append(dep_node)
has_dependencies = current_dependencies
sleep(seconds_to_sleep)
if __name__ == '__main__':
main()
#!/usr/bin/env python
__author__ = "Daniel McDonald"
__copyright__ = "Copyright 2011, The QIIME Project"
__credits__ = ["Daniel McDonald", "Greg Caporaso"]
__license__ = "GPL"
__version__ = "1.5.0-dev"
__maintainer__ = "Daniel McDonald"
__email__ = "[email protected]"
__status__ = "Development"
from cogent.util.unit_test import TestCase, main
from lib_parallel_merge_otu_tables import mergetree, mergeorder, \
initial_nodes_to_merge, initial_has_dependencies, job_complete, \
torque_job, local_job, start_job, JobError
import os
class MergeTests(TestCase):
def setUp(self):
pass
def test_mergetree(self):
"""construct a merge subtreetree with various properties set"""
exp = "(A,B)A_B;"
obs = mergetree(['A.biom'],['B.biom'],'foo')
self.assertEqual(obs.getNewick(escape_name=False), exp)
self.assertEqual(obs.Children[0].Name, 'A')
self.assertEqual(obs.Children[0].FilePath, 'A.biom')
self.assertEqual(obs.Children[0].Processed, False)
self.assertEqual(obs.Children[0].PollPath, None)
self.assertEqual(obs.Children[0].FullCommand, None)
self.assertEqual(obs.Children[1].Name, 'B')
self.assertEqual(obs.Children[1].FilePath, 'B.biom')
self.assertEqual(obs.Children[1].Processed, False)
self.assertEqual(obs.Children[1].PollPath, None)
self.assertEqual(obs.Children[1].FullCommand, None)
self.assertEqual(obs.Name, 'A_B')
self.assertEqual(obs.FilePath, 'foo/A_B.biom')
self.assertEqual(obs.Processed, False)
self.assertEqual(obs.PollPath, 'foo/A_B.biom.poll')
self.assertEqual(obs.FullCommand, None)
def test_mergeorder(self):
"""recursively build and join all the subtrees"""
exp = "((A,B)A_B,(C,(D,E)D_E)C_D_E)A_B_C_D_E;"
obs = mergeorder(['A','B','C','D','E'],'foo')
self.assertEqual(obs.getNewick(escape_name=False), exp)
def test_initial_nodes_to_merge(self):
"""determine the first nodes to merge"""
t = mergeorder(['A','B','C','D','E'],'foo')
exp = set([t.Children[0], t.Children[1].Children[1]])
obs = initial_nodes_to_merge(t)
self.assertEqual(obs,exp)
def test_initial_has_dependencies(self):
"""determine initial has_dependencies"""
t = mergeorder(['A','B','C','D','E'],'foo')
exp = [t,t.Children[1]]
obs = initial_has_dependencies(t, initial_nodes_to_merge(t))
self.assertEqual(obs, exp)
def test_job_complete(self):
"""check if a job is complete"""
t = mergeorder(['A','B','C','D','E'],'foo')
self.assertFalse(job_complete(t))
self.assertFalse(job_complete(t.Children[0]))
self.assertFalse(job_complete(t.Children[1].Children[1]))
self.assertRaises(JobError, job_complete, t.Children[0].Children[0])
f = 'test_parallel_merge_otus_JOB_COMPLETE_TEST.poll'
self.assertFalse(os.path.exists(f))
testf = open(f,'w')
testf.write('0\n')
testf.close()
t.PollPath = f
t.StartTime = 10
self.assertTrue(job_complete(t))
self.assertNotEqual(t.EndTime, None)
self.assertNotEqual(t.TotalTime, None)
testf = open(f,'w')
testf.write('1\n')
testf.close()
self.assertRaises(JobError, job_complete, t)
t.Processed = False
self.assertRaises(JobError, job_complete, t)
os.remove(f)
def test_torque_job(self):
"""wrap a torque job"""
exp = 'echo "abc; echo $? > xyz" | qsub -k oe -N 123 -q memroute -l walltime=999:00:00 -l pvmem=64gb'
obs = torque_job('abc','xyz','123')
self.assertEqual(obs,exp)
def test_start_job(self):
"""start a job"""
exp = 'echo "x y -i A.biom,B.biom -o foo/A_B.biom; echo $? > foo/A_B.biom.poll" | qsub -k oe -N A_B -q memroute -l walltime=999:00:00 -l pvmem=64gb'
t = mergeorder(['A.biom','B.biom','C','D','E'],'foo')
start_job(t.Children[0], 'x','y',torque_job,False)
self.assertEqual(t.Children[0].FullCommand, exp)
def test_local_job(self):
"""fire off a local job"""
exp = "abc; echo $? > xyz"
obs = local_job('abc','xyz','notused')
self.assertEqual(obs,exp)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment