Created
June 22, 2019 14:44
-
-
Save MatthewRalston/6641f45bdce19341f568264132b794de to your computer and use it in GitHub Desktop.
Fasta + Bam file validation with S3 support
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import gzip | |
import io | |
import tempfile | |
from Bio import SeqIO, bgzf | |
import pysam | |
import boto3 | |
# Logger | |
import logging | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
# Boto | S3 | |
s3 = boto3.resource('s3') | |
s3client = boto3.client('s3') | |
#stream = io.BytesIO(s3.get_object(Bucket=bucket, Key=key)['Body'].read()) # Type check some of this please | |
# with open("example.bam", 'rb') as ifile: | |
# with bgzf.BgzfReader(mode='r', fileobj=ifile) as bamfh: | |
# print(bamfh.readline().rstrip()) | |
alignment_suffixes = [".sam", ".bam", ".cram"] | |
s3prefix = "s3://" | |
class Reader: | |
def __init__(self, alignment: io.IOBase, reference: io.IOBase): | |
if not isinstance(alignment, io.IOBase): | |
raise TypeError("ncbi.reader.Reader expects a 'alignment' file as its first positional argument") | |
elif not isinstance(reference, io.IOBase): | |
raise TypeError("ncbi.reader.Reader expects a 'reference' Fasta file as its second positional argument") | |
alignment.close() | |
reference.close() | |
fasta_data = self.__validate_fasta(reference.name) | |
self.reference_file = fasta_data["filepath"] | |
self.__reference_is_temporary_file = fasta_data["temporary"] | |
self.reference_lengths = fasta_data["lengths"] | |
alignment_data = self.__validate_alignment(alignment.name, self.reference_lengths) | |
self.alignment_file = alignment_data["indexed"] | |
self.__alignment_is_temporary_file = alignment_data["temporary"] | |
self.alignment = pysam.AlignmentFile(self.alignment_file) | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.alignment.close() | |
if self.__alignment_is_temporary_file: | |
for f in glob.glob(self.alignment_file + "*"): | |
logger.debug(" Unlinking alignment/index file '{0}'...".format(f)) | |
os.unlink(f) | |
if self.__reference_is_temporary_file: | |
logger.debug(" Unlinking reference file '{0}'...".format(self.reference_file)) | |
os.unlink(self.reference_file) | |
def __s3_file_download(self, seqpath): | |
""" | |
Note: the file will be downloaded into a temporary file that needs to be deleted afterwards | |
It will create the temporary file with respect to the TMP bash variable 'export TMP=/some/temporary/location' | |
:param seqpath: The s3 identifier of a object. 's3://bucket/example.fasta' | |
:type seqpath: str | |
:returns: The location of a downloaded gennomic Fasta file | |
:rtype: str | |
""" | |
if type(seqpath) is not str: | |
raise TypeError("ngsci.reader.__s3_file_download expects a str 'seqpath' as its first positional argument") | |
elif seqpath[0:5] != s3prefix: | |
raise TypeError("ngsci.reader.__s3_file_download expects a s3 object reference its first positional argument. e.g. 's3://bucket/example.txt'") | |
seqpath = seqpath.lstrip(s3prefix) | |
pathsegs =seqpath.split('/') | |
bucket = pathsegs.pop(0) | |
key = pathsegs.join('/') | |
if seqpath[-3:] == ".gz": | |
suffix = '.' + sepath.split('.')[-2:].join('.') | |
else: | |
suffix = os.path.splitext(seqpath)[1] | |
filepath = tempfile.NamedTemporaryFile(mode='w+b', suffix=suffix, delete=False) | |
logger.info("Downloading '{0}' => '{1}'...".format(seqpath, filepath.name)) | |
obj = s3.Object(bucket, key) | |
obj.download_fileobj(filepath) | |
filepath.close() | |
return filepath.name | |
def __validate_fasta(self, seqpath): | |
""" | |
Note: the function returns a NamedTemporaryFile if the input is an s3 object. This needs to be os.unlink'd | |
:param seqpath: | |
:returns: {lengths: {seqid: seqlen, ...}, filepath: 'path/to/example.fasta', temporary: False} Returns a dictionary of sequence objects, the filepath, and whether the | |
:rtype: | |
""" | |
if type(seqpath) is not str: | |
raise TypeError("ngsci.reader.__validate_fasta expects a str 'seqpath' as its first positional argument") | |
elif (seqpath[-3:] != ".fa" and seqpath[-6:] != ".fa.gz" and seqpath[-6:] != ".fasta" and seqpath[-9:] != ".fasta.gz"): | |
raise TypeError("ngsci.reader.__validate_fasta expects the filepath to have a '.fa', '.fasta', or gzipped version of these format") | |
result = { | |
"lengths": [], | |
"filepath": seqpath, | |
"temporary": False | |
} | |
if seqpath[0:5] == "s3://": | |
localfasta = self.__s3_file_download(seqpath) | |
result["filepath"] = localfasta | |
result["temporary"] = True | |
elif not os.path.exists(result["filepath"]) or not os.access(result["filepath"], os.R_OK): | |
raise os.error("ngsci.reader.read_fasta expects the file to be accessible on the filesystem") | |
try: | |
logger.debug("Reading reference sequence lengths from fasta file '{0}'...".format(result["filepath"])) | |
if seqpath[-3:] == ".gz": | |
with open(result["filepath"], 'rb') as ifile: | |
with gzip.open(ifile, mode='rt') as data: | |
result["lengths"] = dict([(x.id, len(x.seq)) for x in list(SeqIO.parse(data, 'fasta'))]) | |
else: # Uncompressed fasta | |
with open(result["filepath"], 'r') as ifile: | |
result["lengths"] = dict([(x.id, len(x.seq)) for x in list(SeqIO.parse(ifile, 'fasta'))]) | |
except Exception as e: | |
logger.error(e) | |
sys.exit(1) | |
return result | |
def __validate_alignment(self, seqpath, lengths): | |
if type(seqpath) is not str: | |
raise TypeError("ngsci.reader.__validate_alignment expects a str 'seqpath' as its first positional argument") | |
elif type(lengths) is not dict or not len(lengths) > 0: | |
raise TypeError("ngsci.reader.__validate_alignment expects a list of tuples as its second positional argument") | |
suffix = os.path.splitext(seqpath)[1] | |
if suffix not in alignment_suffixes: | |
raise TypeError("ngsci.reader.__validate_alignment expects the sequence file to have a '.sam', '.bam', or '.cram' suffix") | |
else: | |
mode=dict(zip(alignment_suffixes, ['r', 'rb', 'rc']))[suffix] | |
result = { | |
"filepath": seqpath, | |
"sorted": None, | |
"indexed": None, | |
"temporary": False | |
} | |
if seqpath[0:5] == "s3://": | |
localsam = self.__s3_file_download(seqpath) | |
result["filepath"] = localsam | |
result["temporary"] = True | |
elif not os.path.exists(result["filepath"]) or not os.access(result["filepath"], os.R_OK): | |
raise os.error("ngsci.reader.__validate_alignment expects the file to be accessible on the filesystem") | |
try: | |
logger.debug("Verifying index status of '{0}'...".format(result["filepath"])) | |
samfile = pysam.AlignmentFile(result["filepath"], mode) | |
try: | |
if samfile.check_index(): | |
result["sorted"] = result["filepath"] | |
result["indexed"] = result["filepath"] | |
samfile.close() | |
except (ValueError, AttributeError) as e: | |
#logger.debug(e) | |
logger.debug("Alignnment file '{0}' is not indexed... sorting and indexing now".format(result["filepath"])) | |
sorted_file = tempfile.NamedTemporaryFile(suffix=suffix, mode='w+b', delete=False) | |
sorted_file.close() | |
result["sorted"] = sorted_file.name # The filepath attribute refers to the temporary sorted + indexed file | |
pysam.sort('-o', result["sorted"], result["filepath"]) # Sort the original filepath into the sorted file | |
if result["temporary"] is True: | |
os.unlink(result["filepath"]) # Delete the old temporary file | |
result["filepath"] = result["sorted"] | |
result["temporary"] = True | |
pysam.index(result["sorted"]) | |
result["indexed"] = result["sorted"] | |
samseqs = dict(list(zip(samfile.references, samfile.lengths))) | |
logger.debug("Cross-referencing reference sequences/lengths with alignment file sequences and lengths...") | |
for k, l in samseqs.items(): | |
if k not in lengths.keys(): | |
raise AttributeError("Alignment file '{0}' references fasta sequence '{1}' which was not provided in the fasta file".format(result["indexed"], k)) | |
elif l != lengths[k]: | |
raise AttributeError("Alignment file '{0}' with length {1} references a fasta sequence '{2}' with length {3}".format(result["indexed"], l, k, lengths[k])) | |
samfile.close() | |
except AttributeError as e: | |
logger.error("Enncountered an error while sorting and indexing the alignment file '{0}'".format(result["filepath"])) | |
logger.debug(result) | |
logger.debug(se) | |
samfile.close() | |
if result["temporary"] is True: | |
if result["filepath"] != seqpath: | |
os.unlink(result["filepath"]) | |
sorted_alignment = result["sorted"] | |
indexed_alignment = result["indexed"] | |
if sorted_alignment is not None and os.path.exists(sorted_alignment) and sorted_alignment != seqpath: | |
logger.debug("Removing temporary sorted alignment file '{0}'...".format(sorted_alignment)) | |
os.unlink(sorted_alignment) | |
if indexed_alignment is not None and os.path.exists(indexed_alignment) and indexed_alignment != seqpath: | |
logger.debug("Removing temporary indexed alignment file '{0}'...".format(indexed_alignment)) | |
os.unlink(indexed_alignment) | |
return result | |
def read_blocks(self, reference, start, end): | |
"""This will read a chunk of reads as a pileup column. Usage as follows | |
for pileupcolumn in reader_instance.read_blocks(reference, 1, 200): | |
for pileupread in pileupcolumn.pileups: | |
if not pileupread.is_del and not pileupread.is_refskip: | |
# Logic for strand specificity and paired end | |
# do something with pileupread.alignment | |
# pileupread.alignment.reference_start | |
# pileupread.alignment.reference_end | |
################### | |
# strand-specific | |
################### | |
# :::: Filter criteria :::: | |
# pileupread.alignment.is_proper_pair | |
# :::: Conditionals | |
# pileupread.alignment.is_read1 and not is_reverse # FR pair, F is read one on forward strand | |
# pileupread.alignment.is_read1 and is_reverse # FR pair, F is read one on reverse strand | |
# OTHER ALIGNMENT TYPES UNHANDLED | |
################### | |
# unstranded | |
################### | |
# :::: Filter criteria :::: | |
# OPTIONS? | |
# :::: Conditionals | |
# NONE | |
# pileup.alignment.reference_start | |
# | |
:param reference: | |
:param start: | |
:param end: | |
:returns: | |
:rtype: | |
""" | |
if type(reference) is not str: | |
raise TypeError("ngsci.reader.Reader expects a str 'reference' as its first positional argument") | |
elif type(start) is not int: | |
raise TypeError("ngsci.reader.Reader expects a int 'start' as its second positional argument") | |
elif type(end) is not int: | |
raise TypeError("ngsci.reader.Reader expects a int 'end' as its third positional argument") | |
elif reference not in self.reference_lengths.keys(): | |
raise KeyError("ngsci.reader.Reader.read_blocks expects the reference argument to be present in the reference Fasta") | |
return self.alignment.pileup(reference, start, end) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment