Skip to content

Instantly share code, notes, and snippets.

@mpkocher
Last active August 29, 2015 13:56
Show Gist options
  • Save mpkocher/9215388 to your computer and use it in GitHub Desktop.
Save mpkocher/9215388 to your computer and use it in GitHub Desktop.
P_Control.py Disable task (which calls summarizeCompareByMovie.py) to handle expired SMRT Cell lots.

Updating 2.1

Replace $SEYMOUR_HOME/analysis/lib/python2.7/pbpy-0.1-py2.7.egg/pbpy/smrtpipe/modules/P_Control.py in SMRTAnalysis 2.1.

#
# $Id: //depot/branches/springfield/S2.1/software/assembly/pbpy/pbpy/smrtpipe/modules/P_Control.py#1 $
#
"""
v1.0 version of a module which aligns a set of reads to a spike-in control
sequence.
"""
import os
import logging
from pbpy.io.ReferenceEntry import ReferenceEntry
from pbpy.smrtpipe.engine.SmrtPipeTasks import task
from pbpy.smrtpipe.engine.DistributableTasks import DistributableTask
from pbpy.smrtpipe.modules.P_Aligner import P_Aligner
from pbpy.smrtpipe.engine.SmrtPipeFiles import (SMRTDataFile, MovieFOFN,
verifySyncedFofn)
from pbpy.smrtpipe.modules.P_Fetch import inputPlsFofn
from pbpy.smrtpipe.engine.common import USE_GLOBAL_NPROC
from pbpy.smrtpipe.cluster.Scatter import ScatterFofn, NoOpScatter
from pbpy.smrtpipe.cluster.Gather import GatherCmpH5, GatherSyncedFofn
from pbpy.smrtpipe.modules.P_Filter import filteredRgnFofn
from pbpy.smrtpipe.core.utils import toSmrtVersion
## Generated by P_Control ###################################################
#
# Before defining the module we define the set of
# files created by the tasks in this module.
#
controlCmpH5 = SMRTDataFile("data/control_reads.cmp.h5",
group = "Diagnostic",
dataItem = "Control Reads",
format = "cmp.h5")
#
noControlRgnFofn = MovieFOFN("data/post_control_regions.fofn")
noControlRgnFofn.addVerifyFunction( verifySyncedFofn( inputPlsFofn ) )
controlSummaryCSV = SMRTDataFile("data/control_results_by_movie.csv",
group = "Diagnostic",
dataItem = "Control Summary",
format = "csv")
log = logging.getLogger(__name__)
_MAJOR_VERSION = "2.2"
_rev = "$Revision: #1 $"
_version = "$Change: 127046 $"
__version__ = toSmrtVersion(_MAJOR_VERSION, _version)
class P_Control( P_Aligner ):
"""Will align a set of reads to a spike-in control sequence and output them
as contigs."""
VERSION = __version__
def validateSettings( self ):
"""Extract relevant settings from the context and store as attributes
of this module, setting defaults and validating where necessary."""
errors = P_Aligner.validateSettings( self )
self.reentrant = True
self.pbinternal = self.setting('pbinternal', 'False')
# for unrolled control sequences we need to mandate aligning the
# whole read (not the subreads)
# (this might change if we ever switch to library-based controls)
self.useSubreads = False
self.filterAdapterOnly = True
# Validate the spike-in control reference
if 'control' in self._context:
self.refFile = self._context.getFile('control')
if not os.path.exists(self.refFile):
errors.append("Can't find spike-in control file %s" % self.refFile)
self.reference = ReferenceEntry(self.refFile)
else:
msg = "DISABLING {c} module and all tasks in {c}. No control sequence identifier specified (parameter 'global.control')"
log.warning(msg.format(c=self.__class__.__name__))
self.disableTasks()
return errors
#
# The nproc parameter controls how many processors/slots will be reserved for
# each instance of this task in a Distributed setting. The value USE_GLOBAL_NPROC
# will use the value set in smrtpipe.rc.
#
# When specifying inputs, a tuple of files will be resolved when the initial
# graph is created. The tuple is resolved to the left-most file in the tuple
# that is generated by another task in the graph. For example, in the align task
# below, 'rgnFofn' will resolve to filteredRgnFofn if P_Filter is run, otherwise
# it will default to inputPlsFofn.
#
@task( inputs = { 'plsFofn' : inputPlsFofn,
'rgnFofn' : ( filteredRgnFofn, inputPlsFofn ) },
outputs = { 'cmpH5' : controlCmpH5 },
nproc = USE_GLOBAL_NPROC,
taskType = DistributableTask,
scatters = [ ScatterFofn( inputPlsFofn ),
ScatterFofn(( filteredRgnFofn, inputPlsFofn )) ],
gathers = [ GatherCmpH5( controlCmpH5 ) ] )
def align( self, files ):
return ( self._align( files ),
"echo 'Alignment Complete'", "date",
self._loadPulses( files ) )
#
# In some cases the tasks can be engineered such that a scatter
# or gather will never be run. In that case, feel free to make
# use of the provided NoOpScatter and NoOpGather.
#
@task( inputs = { 'cmpH5' : controlCmpH5,
'rgnFofn' : ( filteredRgnFofn, inputPlsFofn ) },
outputs = { 'noCtrlFofn': noControlRgnFofn },
taskType = DistributableTask,
scatters = [ ScatterFofn(( filteredRgnFofn, inputPlsFofn )),
NoOpScatter( controlCmpH5 ) ],
gathers = [ GatherSyncedFofn( noControlRgnFofn, inputPlsFofn ) ] )
def updateRgn( self, files ):
"""Pass forward the information required to ignore control reads in the downstream alignment."""
return "maskAlignedReads.py %s %s %s" % \
( files.cmpH5.path, files.rgnFofn.path, files.noCtrlFofn.path )
@task( inputs = { 'plsFofn' : inputPlsFofn,
'cmpH5wP' : controlCmpH5 },
outputs = { 'summary' : controlSummaryCSV },
enabled=False )
def summaryCSV( self, files ):
"""Write out a CSV file summarizing the control alignment results."""
extOpt = '--external' if not self.pbinternal else ''
return 'summarizeCompareByMovie.py %s --fofn=%s %s > %s' % \
( extOpt, files.plsFofn.path, files.cmpH5wP.path, files.summary.path )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment