Skip to content

Instantly share code, notes, and snippets.

@apcamargo
Last active August 23, 2024 03:39
Show Gist options
  • Save apcamargo/d039aa04a2cbbcbb14e2d34a0963b862 to your computer and use it in GitHub Desktop.
Save apcamargo/d039aa04a2cbbcbb14e2d34a0963b862 to your computer and use it in GitHub Desktop.
Fancy FASTA parser in Python
import bz2
import gzip
import lzma
import textwrap
from contextlib import contextmanager
from enum import Enum, auto
from pathlib import Path
class Compression(Enum):
bzip2 = auto()
gzip = auto()
xz = auto()
zstd = auto()
uncompressed = auto()
def is_compressed(filepath: Path):
with open(filepath, "rb") as fin:
signature = fin.peek(8)[:8]
if tuple(signature[:2]) == (0x1F, 0x8B):
return Compression.gzip
elif tuple(signature[:3]) == (0x42, 0x5A, 0x68):
return Compression.bzip2
elif tuple(signature[:7]) == (0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00, 0x00):
return Compression.xz
elif tuple(signature[:4]) == (0x28, 0xB5, 0x2F, 0xFD):
return Compression.zstd
else:
return Compression.uncompressed
@contextmanager
def open_file(filepath):
filepath_compression = is_compressed(filepath)
if filepath_compression is Compression.gzip:
fin = gzip.open(filepath, "rt")
elif filepath_compression is Compression.bzip2:
fin = bz2.open(filepath, "rt")
elif filepath_compression is Compression.xz:
fin = lzma.open(filepath, "rt")
else:
fin = open(filepath, "r")
try:
yield fin
finally:
fin.close()
class Sequence:
def __init__(self, header: str, seq: str, compress: bool = False):
self._compress = compress
self._header = header
if self._compress:
self._seq = gzip.compress(seq.encode("ascii"), 1)
else:
self._seq = seq.encode("ascii")
@property
def header(self):
return self._header
@property
def accession(self):
return self._header.split()[0]
@property
def seq(self):
if self._compress:
return gzip.decompress(self._seq).decode()
else:
return self._seq.decode()
@property
def seq_ascii(self):
return self.seq.upper().encode("ascii")
def count(self, substring: str):
return self.seq.count(substring)
def rc(self):
tab = self.seq.maketrans("ACTGNactgn", "TGACNtgacn")
return Sequence(self.header, self.seq.translate(tab)[::-1], self._compress)
def has_dtr(self, min_length: int = 21):
substring = self.seq.casefold()[:min_length]
pos = self.seq.casefold().rfind(substring)
if pos < len(self) / 2:
return False, 0
substring = self.seq.casefold()[pos:]
return self.seq.casefold()[: len(substring)] == substring, len(substring)
def has_itr(self, min_len: int = 21):
rev = self.rc().seq
if self.seq.casefold()[:min_len] == rev.casefold()[:min_len]:
i = min_len + 1
while self.seq.casefold()[:i] == rev.casefold()[:i] and i <= len(self) // 2:
i += 1
return True, i - 1
else:
return False, 0
def __str__(self):
return f">{self.header}\n{textwrap.fill(self.seq, 60, break_on_hyphens=False)}\n"
def __repr__(self):
if len(self) > 40:
start = self.seq[:34]
end = self.seq[-3:]
seq = f"{start}...{end}"
else:
seq = self.seq
return f"Sequence({self.accession}, {seq})"
def __len__(self):
return len(self.seq)
def __getitem__(self, k: int):
return Sequence(self.header, self.seq[k], self._compress)
def __eq__(self, other: object):
if other.__class__ is self.__class__:
return self.seq.casefold() == other.seq.casefold()
elif other.__class__ is str:
return self.seq.casefold() == other.casefold()
return NotImplemented
def __hash__(self):
return hash(self.seq.casefold())
def __add__(self, other: object):
if other.__class__ is not self.__class__:
return NotImplemented
compress = other._compress or self._compress
return Sequence(
f"{self.accession}+{other.accession}", f"{self.seq}{other.seq}", compress
)
def read_fasta(filepath, uppercase=False, strip_n=False, compress=False):
with open_file(filepath) as fin:
last = None
while True:
if not last:
for l in fin:
if l[0] == ">":
last = l[:-1]
break
if not last:
break
name, seqs, last = last[1:], [], None
for l in fin:
if l[0] == ">":
last = l[:-1]
break
seqs.append(l[:-1])
seqs = "".join(seqs)
if uppercase:
seqs = seqs.upper()
if strip_n:
seqs = seqs.strip("nN")
if len(seqs):
yield Sequence(name, seqs, compress)
if not last:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment