Skip to content

Instantly share code, notes, and snippets.

@apcamargo
Last active February 1, 2025 01:43
Show Gist options
  • Save apcamargo/449fbe5c33f25b22b25b195d44d57624 to your computer and use it in GitHub Desktop.
Save apcamargo/449fbe5c33f25b22b25b195d44d57624 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# hhblits -v 0 -cpu 1 -n 1 -p 90 -z 0 -Z 5000 -b 0 -B 5000 -M 50 -d busco_db/busco -i msa.faa -o msa.hhr
from collections import namedtuple
from pathlib import Path
import argparse
parser = argparse.ArgumentParser(description='Parse hhsearch hhr output file.')
parser.add_argument('-i', help='input hrr path', dest='input_file',type=str, required=True)
parser.add_argument('-o', help='output path for parsed tabular', dest='output_file',type=str, required=True)
args = parser.parse_args()
input_file = args.input_file
output_file = args.output_file
hhr_alignment = namedtuple(
"hhr_alignment",
[
"query_id",
"query_length",
"query_neff",
"template_id",
"template_length",
"template_info",
"template_neff",
"query_ali",
"template_ali",
"start",
"end",
"probability",
"evalue",
"score",
"aligned_cols",
"identity",
"similarity",
"sum_probs",
"query_coverage",
"template_coverage",
],
)
class HHRFormatError(Exception):
def __init__(self, value):
self.value = "ERROR: " + value
def __str__(self):
return repr(self.value)
def parse_result(lines):
results = []
query_id = None
query_length = None
query_neff = None
query_seq = []
template_id = None
template_length = None
template_seq = []
template_info = None
query_start = None
query_end = None
template_start = None
template_end = None
probability = None
evalue = None
score = None
identity = None
similarity = None
template_neff = None
sum_probs = None
aligned_cols = None
query_coverage = None
template_coverage = None
skipped_ali_tags = ["ss_dssp", "ss_pred", "Consensus"]
is_alignment_section = False
for line in lines:
if line.startswith("Query"):
query_id = line.split()[1]
elif line.startswith("Match_columns"):
query_length = int(line.split()[1])
elif line.startswith("Neff"):
query_neff = float(line.split()[1])
elif is_alignment_section and (
line.startswith("No") or line.startswith("Done!")
):
if query_start is not None:
result = hhr_alignment(
# query_id,
Path(input_file).stem,
query_length,
query_neff,
template_id,
template_length,
template_info,
template_neff,
"".join(query_seq),
"".join(template_seq),
(query_start, template_start),
(query_end, template_end),
probability,
evalue,
score,
aligned_cols,
identity,
similarity,
sum_probs,
query_coverage,
template_coverage,
)
results.append(result)
template_id = None
template_info = None
query_seq = []
template_seq = []
query_start = None
query_end = None
template_start = None
template_end = None
elif line.startswith("Probab"):
tokens = line.split()
probability = float(tokens[0].split("=")[1])
evalue = float(tokens[1].split("=")[1])
score = float(tokens[2].split("=")[1])
aligned_cols = int(tokens[3].split("=")[1])
identity = float(tokens[4].split("=")[1].replace("%", "")) / 100.0
similarity = float(tokens[5].split("=")[1])
sum_probs = float(tokens[6].split("=")[1])
if len(tokens) > 7:
template_neff = float(tokens[7].split("=")[1])
continue
elif line.startswith(">"):
is_alignment_section = True
template_id = line[1:].split()[0]
template_info = line
elif line.startswith("Q"):
tokens = line.split()
if tokens[1] in skipped_ali_tags:
continue
try:
token_2 = tokens[2].replace("(", "").replace(")", "")
token_2 = int(token_2)
except:
raise HHRFormatError(
(
"Converting failure of start index ({}) " "of query alignment"
).format(tokens[2])
)
if query_start is None:
query_start = token_2
query_start = min(query_start, token_2)
try:
token_4 = tokens[4].replace("(", "").replace(")", "")
token_4 = int(token_4)
except:
raise HHRFormatError(
(
"Converting failure of end index ({}) " "of query alignment"
).format(tokens[4])
)
if query_end is None:
query_end = token_4
query_end = max(query_end, token_4)
query_seq.append(tokens[3])
elif line.startswith("T"):
tokens = line.split()
if tokens[1] in skipped_ali_tags:
continue
template_seq.append(tokens[3])
try:
token_2 = tokens[2].replace("(", "").replace(")", "")
token_2 = int(token_2)
except:
raise HHRFormatError(
(
"Converting failure of start index ({}) "
"of template alignment"
).format(tokens[2])
)
if template_start is None:
template_start = token_2
template_start = min(template_start, token_2)
try:
token_4 = tokens[4].replace("(", "").replace(")", "")
token_4 = int(token_4)
except:
raise HHRFormatError(
(
"Converting failure of end index ({}) " "of template alignment"
).format(tokens[4])
)
if template_end is None:
template_end = token_4
template_end = max(template_end, token_4)
try:
token_5 = tokens[4].replace("(", "").replace(")", "")
token_5 = int(token_5)
except:
raise HHRFormatError(
(
"Converting failure of template length ({}) "
"in template alignment"
).format(tokens[5])
)
template_length = token_5
query_coverage = aligned_cols / query_length
template_coverage = aligned_cols / template_length
if template_id is not None and query_start is not None:
result = hhr_alignment(
# query_id,
Path(input_file).stem,
query_length,
query_neff,
template_id,
template_length,
template_info,
template_neff,
"".join(query_seq),
"".join(template_seq),
(query_start, template_start),
(query_end, template_end),
probability,
evalue,
score,
aligned_cols,
identity,
similarity,
sum_probs,
query_coverage,
template_coverage,
)
results.append(result)
return results
def read_result(input_file):
with open(input_file) as fh:
lines = fh.readlines()
return parse_result(lines)
def parse_hhr(input_file, output_file):
with open(output_file, "w") as fout:
header = (
"qid"
+ "\t"
+ "tid"
+ "\t"
+ "qlen"
+ "\t"
+ "tlen"
+ "\t"
+ "qcov"
+ "\t"
+ "tcov"
+ "\t"
+ "qcoord"
+ "\t"
+ "tcoord"
+ "\t"
+ "prob"
+ "\t"
+ "eval"
+ "\t"
+ "score"
+ "\n"
)
fout.write(header)
for result in read_result(input_file):
qcoord = map(str, [result.start[0], result.end[0]])
tcoord = map(str, [result.start[1], result.end[1]])
fout.write(
result.query_id
+ "\t"
+ result.template_id
+ "\t"
+ str(result.query_length)
+ "\t"
+ str(result.template_length)
+ "\t"
+ "{0:.4f}".format(result.query_coverage)
+ "\t"
+ "{0:.4f}".format(result.template_coverage)
+ "\t"
+ "-".join(qcoord)
+ "\t"
+ "-".join(tcoord)
+ "\t"
+ "{0:.4f}".format(result.probability / 100)
+ "\t"
+ "{0:.2e}".format(result.evalue)
+ "\t"
+ "{0:.2f}".format(result.score)
+ "\n"
)
parse_hhr(input_file, output_file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment