Skip to content

Instantly share code, notes, and snippets.

@keuv-grvl
Last active June 14, 2018 15:45
Show Gist options
  • Save keuv-grvl/c38455affc11bb0abf6cb80ef5b0cf54 to your computer and use it in GitHub Desktop.
Save keuv-grvl/c38455affc11bb0abf6cb80ef5b0cf54 to your computer and use it in GitHub Desktop.
MetaGeneAnnotator wrapper. Display a progress bar while executing MGA.
#!/usr/bin/env python3
# MetaGeneAnnotator: http://metagene.nig.ac.jp/
import argparse
import sys
_VERSION = "0.3"
def _print_progressbar(step, maxi, msg="", char="=", width=50):
"""
Print a progress bar then place the cursor at the begging of the line.
Display can be really messy if `maxi` is set incorrectly.
import time
n=32
for i in range(n):
time.sleep(0.1)
_print_progressbar(i+1, n, msg="Test", char='=', width=50)
print()
"""
# rows, columns = os.popen('stty size', 'r').read().split()
p = int(100 * step / maxi)
print(
"[%s>%s] %d%% (%d/%d) %-20s"
% (
char * int(p * width / 100),
(" " * (width - int(p * width / 100))),
p,
step,
maxi,
msg,
),
end="\r",
flush=True,
)
def run_mga(args):
"""
Run MetaGeneAnnotator in a subprocess while parsing its standard output.
Returns the MetaGeneAnnotator return code.
"""
from shutil import which
import subprocess
from skbio.io import read as FastaReader
if not which(args.mgaexec):
raise Exception("MetaGeneAnnotator is not installed")
if str(args.species) == "multi":
mgaopt = "-m"
elif str(args.species) == "single":
mgaopt = "-s"
nb_seq = sum(1 for x in FastaReader(args.input, format="fasta", verify=False))
cmd = [args.mgaexec, mgaopt, args.input]
i = 0
with open(args.output, "w") as outfile:
p = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
if args.outfmt == "gff":
print("##gff-version 3", file=outfile) # GFF3 header
for x in p.stdout:
xx = x.decode(sys.getdefaultencoding()).rstrip()
if args.outfmt == "mga":
print(xx, file=outfile)
else:
if xx.startswith("#"):
if not xx.startswith("# gc") and not xx.startswith("# self"):
seqid = xx[2:]
i += 1
_print_progressbar(i, nb_seq, msg=seqid)
else:
(_, start, end, strand, frame, _, score, _, _, _, _) = xx.split(
"\t"
)
print(
seqid,
"MGA",
"gene",
start,
end,
score,
strand,
frame,
".",
sep="\t",
file=outfile,
)
print()
p.wait()
p.terminate()
return p.returncode
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Wrapper for MetaGeneAnnotator")
requiredNamed = parser.add_argument_group("required named arguments")
requiredNamed.add_argument(
"--input", help="Input file name (FASTA format)", required=True
)
requiredNamed.add_argument(
"--species",
choices=["multi", "single"],
help="Sequences are treated individually (multi) or as a unit (single)",
required=True,
)
parser.add_argument("--output", help="Output file name", default="output.gff")
parser.add_argument(
"--outfmt", help="Output format", choices=["mga", "gff"], default="gff"
)
parser.add_argument(
"--mgaexec", help="Path to MetaGeneAnnotator executable", default="mga"
)
parser.add_argument(
"-v", "--version", help="Print wrapper version", action="store_true"
)
args = parser.parse_args()
if args.version:
print(_VERSION)
sys.exit()
ret = run_mga(args)
sys.exit(ret) # exit as MGA
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment