Skip to content

Instantly share code, notes, and snippets.

@clintval
Created January 1, 2022 15:03
Show Gist options
  • Save clintval/7f0e22a1cbdd8dd595db95d6dc1c87eb to your computer and use it in GitHub Desktop.
Save clintval/7f0e22a1cbdd8dd595db95d6dc1c87eb to your computer and use it in GitHub Desktop.
Wrap the HTSJDK SAM locus iterator into something more ergonomic
package io.cvbio.sam
import com.fulcrumgenomics.FgBioDef.View
import com.fulcrumgenomics.bam._
import com.fulcrumgenomics.bam.api.{SamRecord, SamSource}
import htsjdk.samtools.SAMRecord
import htsjdk.samtools.filter.{DuplicateReadFilter, FailsVendorReadQualityFilter, SamRecordFilter, SecondaryAlignmentFilter}
import htsjdk.samtools.util.AbstractRecordAndOffset.AlignmentType.{Deletion, Insertion, Match}
import htsjdk.samtools.util.SamLocusIterator.LocusInfo
import htsjdk.samtools.util.{Interval, SamLocusIterator => HtsJdkSamLocusIterator}
import io.cvbio.coord.IntervalUtil
import io.cvbio.sam.SamLocusIterator.{MappedFilter, SupplementalAlignmentFilter}
import java.io.Closeable
import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters.{CollectionHasAsScala, IteratorHasAsScala, SeqHasAsJava}
// htsjdk.samtools.util.SamLocusIterator dontCheckQualities could be more efficient if it did a <= 0 instead of == 0.
class SamLocusIterator(
source: SamSource,
intervals: Seq[Interval] = Seq.empty,
emitUncoveredLoci: Boolean = true,
includeDuplicates: Boolean = false,
includeNonPfReads: Boolean = true,
includeSecondaryAlignments: Boolean = false,
includeSupplementalAlignments: Boolean = false,
mappedPairsOnly: Boolean = true,
maxReadsToAccumulate: Int = Int.MaxValue,
minBaseQ: Int = 20,
minMapQ: Int = 20,
) extends Closeable with View[Pileup[PileupEntry]] {
/** All base-level filters. */
private val baseFilters: Seq[PileupEntry => Boolean] = {
val buffer = new ListBuffer[PileupEntry => Boolean]()
if (minBaseQ > 0) buffer.addOne {
case entry: BaseEntry => entry.qual >= minBaseQ
case _ => true
}
buffer.toSeq
}
/** All record-level filters. */
private val recordFilters: Seq[SamRecord => Boolean] = {
val buffer = new ListBuffer[SamRecord => Boolean]()
if (minMapQ > 0) buffer += { r => r.mapq >= minMapQ }
if (mappedPairsOnly) buffer += { r => r.paired && r.mapped && r.mateMapped }
if (!includeDuplicates) buffer += { r => !r.duplicate }
if (!includeSecondaryAlignments) buffer += { r => !r.secondary }
if (!includeSupplementalAlignments) buffer += { r => !r.supplementary }
buffer.toSeq
}
/** The underlying set of SAM filters for the underlying [[HtsJdkSamLocusIterator]] for this [[SamLocusIterator]]. */
private val underlyingIteratorFilters = {
val buffer = new ListBuffer[SamRecordFilter]()
if (includeDuplicates) buffer.addOne(new DuplicateReadFilter)
if (includeNonPfReads) buffer.addOne(new FailsVendorReadQualityFilter)
if (includeSecondaryAlignments) buffer.addOne(new SecondaryAlignmentFilter)
if (includeSupplementalAlignments) buffer.addOne(new SupplementalAlignmentFilter)
if (mappedPairsOnly) buffer.addOne(new MappedFilter)
buffer.toSeq
}
/** The underlying [[HtsJdkSamLocusIterator]] for this [[SamLocusIterator]]. */
private val locusIterator: HtsJdkSamLocusIterator = {
val intervalList = Option.when(intervals.nonEmpty)(IntervalUtil.intoHtsJdk(intervals, dict = source.dict))
val iterator = new HtsJdkSamLocusIterator(source.toSamReader, intervalList.orNull)
iterator.setEmitUncoveredLoci(emitUncoveredLoci)
iterator.setIncludeNonPfReads(includeNonPfReads)
iterator.setMappingQualityScoreCutoff(minMapQ)
iterator.setMaxReadsToAccumulatePerLocus(maxReadsToAccumulate)
iterator.setQualityScoreCutoff(minBaseQ)
iterator.setSamFilters(underlyingIteratorFilters.asJava)
iterator
}
/** Iterate over the pileup of records one locus at a time. */
override def iterator: Iterator[Pileup[PileupEntry]] = {
locusIterator.asScala.map { locus: LocusInfo =>
val pile = locus.getRecordAndOffsets.asScala.flatMap { recAndOffset =>
val offset = recAndOffset.getOffset
val record = recAndOffset.getRecord.asInstanceOf[SamRecord]
lazy val entry = recAndOffset.getAlignmentType match {
case Match => BaseEntry(rec = record, offset = offset)
case Deletion => DeletionEntry(rec = record, offset = offset)
case Insertion => InsertionEntry(rec = record, offset = offset)
}
Option.unless(recordFilters.exists(fn => fn(record)) || baseFilters.exists(fn => fn(entry)))(entry)
}
Pileup(refName = locus.getSequenceName, refIndex = locus.getSequenceIndex, pos = locus.getPosition, pile = pile.toSeq)
}
}
/** Close the wrapped SAM source and the underlying HTSJDK locus iterator. */
override def close(): Unit = {
source.close()
locusIterator.close()
}
}
/** Companion object for [[SamLocusIterator]]. */
object SamLocusIterator {
// TODO: Consider a PR into HTSJDK so they can have this filter for the sake of completeness.
/** SAM record filter to filter out records or pairs where one or both of them are unmapped. */
private[sam] class MappedFilter extends SamRecordFilter {
/** Filter out the record if it is a supplemental alignment. */
override def filterOut(record: SAMRecord): Boolean = record.getReadUnmappedFlag
/** Filter out both records if either is a supplemental alignment. */
override def filterOut(first: SAMRecord, second: SAMRecord): Boolean = filterOut(first) || filterOut(second)
}
// TODO: Consider a PR into HTSJDK so they can have this filter for the sake of completeness.
/** SAM record filter to filter out records or pairs where any are supplemental alignments since HTSJDK doesn't have one. */
private[sam] class SupplementalAlignmentFilter extends SamRecordFilter {
/** Filter out the record if it is a supplemental alignment. */
override def filterOut(record: SAMRecord): Boolean = record.getSupplementaryAlignmentFlag
/** Filter out both records if either is a supplemental alignment. */
override def filterOut(first: SAMRecord, second: SAMRecord): Boolean = filterOut(first) || filterOut(second)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment