Created
June 1, 2021 22:14
-
-
Save peterk87/de921ea877dbad280ea7b395312f71e8 to your computer and use it in GitHub Desktop.
Rotate a circular contig to start at the same position as a reference sequence
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# coding: utf-8 | |
import logging | |
import subprocess | |
from io import StringIO | |
from pathlib import Path | |
from typing import Optional | |
import sys | |
import pandas as pd | |
import typer | |
from Bio import SeqIO, Entrez | |
from Bio.SeqRecord import SeqRecord | |
from rich.logging import RichHandler | |
from rich.traceback import install | |
BLAST_COLS = [ | |
('qaccver', 'str'), | |
('saccver', 'str'), | |
('stitle', 'str'), | |
('pident', 'float'), | |
('length', 'int'), | |
('mismatch', 'int'), | |
('gapopen', 'int'), | |
('qstart', 'int'), | |
('qend', 'int'), | |
('sstart', 'int'), | |
('send', 'int'), | |
('qlen', 'int'), | |
('slen', 'int'), | |
] | |
def main(assembly_fasta: Path, | |
outdir: Path = typer.Option(Path('rotate_contig-output/'), help='Output directory'), | |
reference_accession: str = typer.Option('AY548484', '-r', '--reference-accession', | |
help='NCBI accession of nucleotide sequence to use for BLAST search'), | |
end_lengths: int = typer.Option(1000, '-l', '--end-lengths', | |
help='Length of seq from ends of reference sequence to use for BLAST search'), | |
min_pident: float = typer.Option(95.0, | |
help='Min nucleotide BLAST % identity for reference ends subsequences search'), | |
entrez_email: str = typer.Option('[email protected]', help='NCBI Entrez email'), | |
force: bool = typer.Option(False, help='Force overwrite output')): | |
"""Rotate a circular contig to start at the same position as a reference sequence | |
Usage: | |
Specify your assembly fasta file and reference accession (e.g. "AY548484"): | |
$ python rotate_contig.py assembly.fasta -r AY548484 | |
Rotated target contig will be output to "rotate_contig-output/rotated.fasta". | |
Requirements: | |
This script requires Python 3.6+ and the biopython, pandas, rich and typer Python modules to be installed: | |
$ pip install biopython pandas rich typer | |
""" | |
install(show_locals=True, width=120, word_wrap=True) | |
logging.basicConfig(format='%(message)s', | |
datefmt='[%Y-%m-%d %X]', | |
level=logging.DEBUG, | |
handlers=[RichHandler(rich_tracebacks=True, | |
tracebacks_show_locals=True)]) | |
if outdir.exists() and not force: | |
logging.error(f'The output directory "{outdir}" already exists! Overwrite previous results with "--force"') | |
sys.exit(1) | |
if not outdir.exists(): | |
outdir.mkdir(parents=True) | |
Entrez.email = entrez_email | |
logging.info(f'Fetching {reference_accession} from NCBI using Entrez API') | |
with Entrez.efetch(db='nucleotide', | |
rettype='fasta', | |
retmode='text', | |
id=reference_accession) as handle: | |
seq_rec: SeqRecord = SeqIO.read(handle, 'fasta') | |
logging.info(f'Fetched "{seq_rec.id} {seq_rec.description}" from NCBI (length={len(seq_rec)})') | |
ref_fasta = outdir / f'{seq_rec.id}.fasta' | |
SeqIO.write([seq_rec], ref_fasta, 'fasta') | |
logging.info(f'Wrote reference fasta to "{ref_fasta}"') | |
ends_fasta = write_ref_ends_fasta(end_lengths, outdir, seq_rec) | |
makeblastdb(assembly_fasta, outdir) | |
blastn_results = blastn(assembly_fasta, ends_fasta) | |
df_blast_ends_top = parse_blastn(blastn_results, min_pident) | |
contig_name = df_blast_ends_top.saccver.unique()[0] | |
asm_recs = {r.id: r for r in SeqIO.parse(assembly_fasta, 'fasta')} | |
contig_seq_record = asm_recs[contig_name] | |
left_end, right_end = get_rotated_ends(contig_seq_record, df_blast_ends_top) | |
if left_end and right_end: | |
output_fasta = write_rotated_fasta(contig_seq_record, outdir, left_end, right_end) | |
logging.info(f'Wrote rotated contig to "{output_fasta}"') | |
else: | |
logging.error(f'Could not find matches to both ends. Start seq = "{left_end}". End seq = "{right_end}"') | |
sys.exit(1) | |
def write_rotated_fasta(contig_seq_record, outdir, s1, s2): | |
rot_rec = SeqRecord((s1.seq + s2.seq), id=contig_seq_record.id, name=contig_seq_record.name) | |
output_fasta = outdir / 'rotated.fasta' | |
SeqIO.write([rot_rec], output_fasta, 'fasta') | |
return output_fasta | |
def write_ref_ends_fasta(end_lengths: int, outdir: Path, seq_rec: SeqRecord) -> Path: | |
start_seq = seq_rec.seq[0:end_lengths] | |
end_seq = seq_rec.seq[-end_lengths:] | |
start_rec_name = f'{seq_rec.id}|1:{len(start_seq)}' | |
start_rec = SeqRecord(start_seq, id=start_rec_name, name=start_rec_name) | |
rec_len = len(seq_rec.seq) | |
end_rec_name = f'{seq_rec.name}|{rec_len - len(end_seq)}:{rec_len}' | |
end_rec = SeqRecord(end_seq, id=end_rec_name, name=end_rec_name) | |
ends_fasta = outdir / f'{seq_rec.id}-{end_lengths}-ends.fasta' | |
SeqIO.write([start_rec, end_rec], ends_fasta, 'fasta') | |
return ends_fasta | |
def get_rotated_ends( | |
contig_seq_record: SeqRecord, | |
df_blast_ends_top: pd.DataFrame | |
) -> (Optional[SeqRecord], Optional[SeqRecord]): | |
left_end: Optional[SeqRecord] = None | |
right_end: Optional[SeqRecord] = None | |
for row in df_blast_ends_top.itertuples(): | |
query_name, seq_range = row.qaccver.split('|') | |
seq_start_idx, seq_end_idx = [int(x) - 1 for x in seq_range.split(':')] | |
do_revcomp = False | |
if row.sstart > row.send: | |
logging.info(f'Reverse complement of subseq matching "{query_name}" ({seq_range}) needed') | |
do_revcomp = True | |
if seq_start_idx == 0: | |
left_end = contig_seq_record[0:max(row.sstart, row.send)] | |
if do_revcomp: | |
left_end = left_end.reverse_complement() | |
else: | |
right_end = contig_seq_record[min(row.sstart, row.send):] | |
if do_revcomp: | |
right_end: SeqRecord = right_end.reverse_complement() | |
return left_end, right_end | |
def parse_blastn(blastn_results: subprocess.CompletedProcess, min_pident: float = 95.0) -> pd.DataFrame: | |
df_blast_ends = pd.read_table(StringIO(blastn_results.stdout.decode()), | |
names=[x for x, _ in BLAST_COLS], | |
dtype={x: y for x, y in BLAST_COLS}) | |
return df_blast_ends[df_blast_ends.pident >= min_pident] | |
def blastn(assembly_fasta: Path, ends_fasta: Path) -> subprocess.CompletedProcess: | |
blastn_cmd = f'blastn ' \ | |
f'-query {ends_fasta.absolute()} ' \ | |
f'-db {assembly_fasta.absolute()} ' \ | |
f'-outfmt "6 {" ".join(x for x, _ in BLAST_COLS)}"' | |
logging.info(f'Running nucleotide BLAST: $ {blastn_cmd}') | |
return subprocess.run( | |
blastn_cmd, | |
check=True, | |
shell=True, | |
stdout=subprocess.PIPE, | |
) | |
def makeblastdb(assembly_fasta: Path, outdir: Path) -> None: | |
db_path = outdir / assembly_fasta.name | |
makeblastdb_cmd = f'makeblastdb ' \ | |
f'-dbtype nucl ' \ | |
f'-in {assembly_fasta.absolute()} ' \ | |
f'-out {db_path.absolute()}' | |
logging.info(f'Making BLAST DB: $ {makeblastdb_cmd}') | |
subprocess.run( | |
makeblastdb_cmd, | |
check=True, | |
shell=True, | |
) | |
if __name__ == '__main__': | |
typer.run(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment