Created
December 12, 2014 17:45
-
-
Save zachcp/c45433307a292513af41 to your computer and use it in GitHub Desktop.
Trim Adaptor Script Modified for Parallel Processing using the Multiprocessing module
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Modified from: | |
https://github.com/chapmanb/bcbb/blob/master/align/adaptor_trim.py | |
See Also: | |
https://bcbio.wordpress.com/2009/08/09/trimming-adaptors-from-short-read-sequences/ | |
Trim adaptor sequences from reads; designed for short read sequencing. | |
Allows trimming of adaptor sequences from a list of SeqRecords produced | |
by the Biopython SeqIO library. | |
This can be imported for use in other scripts, or can be run directly. Running | |
the script with no arguments will run the tests. Run directly, it will convert a | |
fastq to a fasta output file, trimming with the passed adaptor: | |
""" | |
from __future__ import with_statement | |
import sys | |
import os | |
import multiprocessing | |
from functools import partial | |
from Bio import pairwise2 | |
from Bio.Seq import Seq | |
from Bio import SeqIO | |
from Bio.SeqIO.QualityIO import FastqGeneralIterator | |
import click | |
@click.command() | |
@click.option('--in_file', type=click.STRING) | |
@click.option('--out_file', type=click.STRING) | |
@click.option('--adaptor_seq', type=click.STRING) | |
@click.option('--num_errors', type=click.INT, default=1) | |
@click.option('--min_size', type=click.INT, default=1) | |
@click.option('--max_size', type=click.INT, default=None) | |
@click.option('--right_side', default=False) | |
@click.option('--ncpus', default=4) | |
def main(in_file, out_file, adaptor_seq, num_errors, min_size, max_size, right_side, ncpus): | |
num_errors = int(num_errors) | |
min_size = int(min_size) | |
max_size = int(max_size) if max_size else None | |
# create a pool of processing nodes | |
p = multiprocessing.Pool(ncpus) | |
# use partial to create a function needing only one argument | |
fastqfunc = partial(process_fastq, adaptor_seq=adaptor_seq, num_errors=num_errors,min_size=min_size, max_size=max_size,right_side=right_side) | |
#open and process the files by shelling out each fastq record to the pool | |
with open(in_file) as in_handle: | |
with open(out_file, "w") as out_handle: | |
fastqs = (rec for rec in FastqGeneralIterator(in_handle)) | |
results = p.imap(fastqfunc, fastqs) | |
for result in results: | |
out_handle.write(str(result)) | |
def process_fastq(fastqdata, adaptor_seq, num_errors, min_size, max_size,right_side): | |
title, seq, qual = fastqdata | |
cur_adaptor = (adaptor_seq[:(len(rec) - max_size)] if max_size | |
else adaptor_seq) | |
trim = trim_adaptor(seq, cur_adaptor, num_errors, right_side) | |
cur_max = max_size if max_size else len(seq) - 1 | |
if len(trim) >= min_size and len(trim) <= cur_max: | |
pos = seq.find(trim) | |
assert pos >= 0 | |
trim_qual = qual[pos:pos+len(trim)] | |
return "@%s\n%s\n+\n%s\n" % (title, trim, trim_qual) | |
def _remove_adaptor(seq, region, right_side=True): | |
"""Remove an adaptor region and all sequence to the right or left. | |
""" | |
# A check for repetitive regions. We handle them below by searching | |
# from the left or right depending on the trimming method which should | |
# be the expected result. | |
#size = len(region) | |
#pieces = [str(seq[i:i+size]) for i in range(len(seq) - size)] | |
#if pieces.count(region) != 1: | |
# raise ValueError("Non-single match: %s to %s" % (region, seq)) | |
if right_side: | |
try: | |
pos = seq.find(region) | |
# handle Biopython SeqRecords | |
except AttributeError: | |
pos = seq.seq.find(region) | |
return seq[:pos] | |
else: | |
try: | |
pos = seq.rfind(region) | |
# handle Biopython SeqRecords | |
except AttributeError: | |
pos = seq.seq.rfind(region) | |
return seq[pos+len(region):] | |
def trim_adaptor(seq, adaptor, num_errors, right_side): | |
"""Trim the given adaptor sequence from a starting sequence. | |
* seq can be either of: | |
- string | |
- Biopython SeqRecord | |
* adaptor is a string sequence | |
* num_errors specifies how many errors are allowed in the match between | |
adaptor and the base sequence. Matches with more than this number of errors | |
are not allowed. | |
""" | |
gap_char = '-' | |
exact_pos = str(seq).find(adaptor) | |
if exact_pos >= 0: | |
seq_region = str(seq[exact_pos:exact_pos+len(adaptor)]) | |
adapt_region = adaptor | |
else: | |
aligns = pairwise2.align.localms(str(seq), str(adaptor), | |
5.0, -4.0, -9.0, -0.5, one_alignment_only=True, | |
gap_char=gap_char) | |
if len(aligns) == 0: | |
adapt_region, seq_region = ("", "") | |
else: | |
seq_a, adaptor_a, score, start, end = aligns[0] | |
adapt_region = adaptor_a[start:end] | |
#print seq_a, adaptor_a, score, start, end | |
seq_region = seq_a[start:end] | |
matches = sum((1 if s == adapt_region[i] else 0) for i, s in | |
enumerate(seq_region)) | |
# too many errors -- no trimming | |
if (len(adaptor) - matches) > num_errors: | |
return seq | |
# remove the adaptor sequence and return the result | |
else: | |
return _remove_adaptor(seq, seq_region.replace(gap_char, ""), | |
right_side) | |
def trim_adaptor_w_qual(seq, qual, adaptor, num_errors, right_side): | |
"""Trim an adaptor with an associated quality string. | |
Works like trimmed adaptor, but also trims an associated quality score. | |
""" | |
assert len(seq) == len(qual) | |
tseq = trim_adaptor(seq, adaptor, num_errors, right_side=right_side) | |
if right_side: | |
pos = seq.find(tseq) | |
else: | |
pos = seq.rfind(tseq) | |
tqual = qual[pos:pos+len(tseq)] | |
assert len(tseq) == len(tqual) | |
return tseq, tqual | |
# ------- Testing Code | |
import unittest | |
from Bio.SeqRecord import SeqRecord | |
from Bio.Seq import Seq | |
from Bio.Alphabet.IUPAC import unambiguous_dna | |
class AdaptorAlignTrimTest(unittest.TestCase): | |
"""Test remove adaptor sequences using local alignments. | |
""" | |
def t_1_simple_trim(self): | |
"""Trim adaptor from non-complex region with errors and deletions. | |
""" | |
adaptor = "GATCGATCGATC" | |
tseq = trim_adaptor("GGG" + adaptor + "CCC", adaptor, 2) # exact | |
assert tseq == "GGG" | |
tseq = trim_adaptor("GGG" + "GATCGTTCGATC" + "CCC", adaptor, 2)# 1 error | |
assert tseq == "GGG" | |
tseq = trim_adaptor("GGG" + "GATCGTTCGAAC" + "CCC", adaptor, 2)# 2 errors | |
assert tseq == "GGG" | |
tseq = trim_adaptor("GGG" + "GATCGATCGTC" + "CCC", adaptor, 2) # deletion | |
assert tseq == "GGG" | |
tseq = trim_adaptor("GGG" + "GACGATCGTC" + "CCC", adaptor, 2) # deletion | |
assert tseq == "GGG" | |
tseq = trim_adaptor("GGG" + "GAACGTTGGATC" + "CCC", adaptor, 2)# 3 errors | |
assert tseq == "GGGGAACGTTGGATCCCC" | |
tseq = trim_adaptor("GGG" + "CATCGGACGTAT" + "CCC", adaptor, 2)# very bad | |
assert tseq == "GGGCATCGGACGTATCCC" | |
def t_2_alternative_side_trim(self): | |
"""Trim adaptor from both sides of the sequence. | |
""" | |
adaptor = "GATCGATCGATC" | |
tseq = trim_adaptor("GGG" + "GATCGTTCGATC" + "CCC", adaptor, 2) | |
assert tseq == "GGG" | |
tseq = trim_adaptor("GGG" + "GATCGTTCGATC" + "CCC", adaptor, 2, False) | |
assert tseq == "CCC" | |
def t_3_repetitive_sequence(self): | |
"""Trim a repetitive adaptor sequence. | |
""" | |
adaptor = "GATCGATC" | |
tseq = trim_adaptor("GGG" + "GATCGATCGATC" + "CCC", adaptor, 2) | |
assert tseq == "GGG" | |
tseq = trim_adaptor("GGG" + "GATCGATCGATC" + "CCC", adaptor, 2, False) | |
assert tseq == "CCC" | |
def t_4_passing_seqs(self): | |
"""Handle both Biopython Seq and SeqRecord objects. | |
""" | |
adaptor = "GATCGATCGATC" | |
seq = Seq("GGG" + "GATCGTTCGATC" + "CCC", unambiguous_dna) | |
tseq = trim_adaptor(seq, adaptor, 2) | |
assert isinstance(tseq, Seq) | |
assert tseq.alphabet == unambiguous_dna | |
tseq = trim_adaptor(seq=seq, adaptor=adaptor, num_errors=2) | |
assert isinstance(tseq, Seq) | |
assert tseq.alphabet == unambiguous_dna | |
trec = trim_adaptor(SeqRecord(seq, "test_id", "test_name", "test_d"), | |
adaptor, 2) | |
assert isinstance(trec, SeqRecord) | |
assert trec.id == "test_id" | |
assert str(trec.seq) == "GGG" | |
trec = trim_adaptor(SeqRecord(seq, "test_id", "test_name", "test_d"), | |
adaptor, 2, False) | |
assert isinstance(trec, SeqRecord) | |
assert str(trec.seq) == "CCC" | |
def t_5_passing_tuple(self): | |
"""Handle passing a tuple of sequence and quality as input. | |
""" | |
adaptor = "GATCGATCGATC" | |
seq = "GGG" + "GATCGTTCGATC" + "CCC" | |
qual = "YDV`a`a^[Xa`a`^`_O" | |
tseq, tqual = trim_adaptor_w_qual(seq, qual, adaptor, num_errors=2) | |
assert tseq == "GGG" | |
assert tqual == "YDV" | |
tseq, tqual = trim_adaptor_w_qual(seq, qual, adaptor, num_errors=2, | |
right_side=False) | |
assert tseq == "CCC" | |
assert tqual == "`_O" | |
def t_6_no_alignment(self): | |
"""Correctly handle case with no alignment between adaptor and sequence. | |
""" | |
adaptor = "AAAAAAAAAAAAAA" | |
to_trim = "TTTTTTTTTTTTTTTTT" | |
tseq = trim_adaptor(to_trim, adaptor, 2) | |
assert tseq == to_trim | |
def run_tests(argv): | |
test_suite = testing_suite() | |
runner = unittest.TextTestRunner(sys.stdout, verbosity = 2) | |
runner.run(test_suite) | |
def testing_suite(): | |
"""Generate the suite of tests. | |
""" | |
test_suite = unittest.TestSuite() | |
test_loader = unittest.TestLoader() | |
test_loader.testMethodPrefix = 't_' | |
tests = [AdaptorAlignTrimTest] | |
for test in tests: | |
cur_suite = test_loader.loadTestsFromTestCase(test) | |
test_suite.addTest(cur_suite) | |
return test_suite | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment