Last active
June 14, 2018 15:45
-
-
Save keuv-grvl/c38455affc11bb0abf6cb80ef5b0cf54 to your computer and use it in GitHub Desktop.
MetaGeneAnnotator wrapper. Display a progress bar while executing MGA.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# MetaGeneAnnotator: http://metagene.nig.ac.jp/ | |
import argparse | |
import sys | |
_VERSION = "0.3" | |
def _print_progressbar(step, maxi, msg="", char="=", width=50): | |
""" | |
Print a progress bar then place the cursor at the begging of the line. | |
Display can be really messy if `maxi` is set incorrectly. | |
import time | |
n=32 | |
for i in range(n): | |
time.sleep(0.1) | |
_print_progressbar(i+1, n, msg="Test", char='=', width=50) | |
print() | |
""" | |
# rows, columns = os.popen('stty size', 'r').read().split() | |
p = int(100 * step / maxi) | |
print( | |
"[%s>%s] %d%% (%d/%d) %-20s" | |
% ( | |
char * int(p * width / 100), | |
(" " * (width - int(p * width / 100))), | |
p, | |
step, | |
maxi, | |
msg, | |
), | |
end="\r", | |
flush=True, | |
) | |
def run_mga(args): | |
""" | |
Run MetaGeneAnnotator in a subprocess while parsing its standard output. | |
Returns the MetaGeneAnnotator return code. | |
""" | |
from shutil import which | |
import subprocess | |
from skbio.io import read as FastaReader | |
if not which(args.mgaexec): | |
raise Exception("MetaGeneAnnotator is not installed") | |
if str(args.species) == "multi": | |
mgaopt = "-m" | |
elif str(args.species) == "single": | |
mgaopt = "-s" | |
nb_seq = sum(1 for x in FastaReader(args.input, format="fasta", verify=False)) | |
cmd = [args.mgaexec, mgaopt, args.input] | |
i = 0 | |
with open(args.output, "w") as outfile: | |
p = subprocess.Popen( | |
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT | |
) | |
if args.outfmt == "gff": | |
print("##gff-version 3", file=outfile) # GFF3 header | |
for x in p.stdout: | |
xx = x.decode(sys.getdefaultencoding()).rstrip() | |
if args.outfmt == "mga": | |
print(xx, file=outfile) | |
else: | |
if xx.startswith("#"): | |
if not xx.startswith("# gc") and not xx.startswith("# self"): | |
seqid = xx[2:] | |
i += 1 | |
_print_progressbar(i, nb_seq, msg=seqid) | |
else: | |
(_, start, end, strand, frame, _, score, _, _, _, _) = xx.split( | |
"\t" | |
) | |
print( | |
seqid, | |
"MGA", | |
"gene", | |
start, | |
end, | |
score, | |
strand, | |
frame, | |
".", | |
sep="\t", | |
file=outfile, | |
) | |
print() | |
p.wait() | |
p.terminate() | |
return p.returncode | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Wrapper for MetaGeneAnnotator") | |
requiredNamed = parser.add_argument_group("required named arguments") | |
requiredNamed.add_argument( | |
"--input", help="Input file name (FASTA format)", required=True | |
) | |
requiredNamed.add_argument( | |
"--species", | |
choices=["multi", "single"], | |
help="Sequences are treated individually (multi) or as a unit (single)", | |
required=True, | |
) | |
parser.add_argument("--output", help="Output file name", default="output.gff") | |
parser.add_argument( | |
"--outfmt", help="Output format", choices=["mga", "gff"], default="gff" | |
) | |
parser.add_argument( | |
"--mgaexec", help="Path to MetaGeneAnnotator executable", default="mga" | |
) | |
parser.add_argument( | |
"-v", "--version", help="Print wrapper version", action="store_true" | |
) | |
args = parser.parse_args() | |
if args.version: | |
print(_VERSION) | |
sys.exit() | |
ret = run_mga(args) | |
sys.exit(ret) # exit as MGA |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment