Created
February 2, 2015 18:45
-
-
Save brevans/1055dd80afc0e27f7c94 to your computer and use it in GitHub Desktop.
sub-sampler for Dan, specific to her naming scheme
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import re | |
import random | |
import argparse | |
from os import makedirs | |
from glob import glob | |
import os.path as path | |
from collections import defaultdict as dd | |
from Bio import SeqIO | |
def parse_file(fn, ft): | |
''' | |
Parse a multi-sequence file (fn) of type (ft) with biopython | |
Return a dictionary of individual key -> list of sequences | |
and a dictionary of population key -> set of individuals | |
''' | |
seqs = dd(lambda: []) | |
pops = dd(lambda: set()) | |
for seq_req in SeqIO.parse(fn, ft): | |
mat = re.match('([a-zA-Z]+)_?([a-zA-Z]*\d*)([ab]?)_(\w+)', seq_req.id) | |
try: | |
ind_attribs = mat.groups() | |
except AttributeError as e: | |
print("Sequence Sample {} in file {} didn't match expected naming scheme!".format(seq_req.id, fn)) | |
raise e | |
ind_key = ''.join(ind_attribs[:2]) | |
pops[ind_attribs[0]].add(ind_key) | |
seqs[ind_key].append(seq_req) | |
return seqs, pops | |
def generate_random_samples(files_list, dir_out, rand_indivs, rand_samples, ft): | |
''' | |
Given a list of sequence files, generate random sub-sampling per-population as inferred by individual name | |
Writes the desired number of resamplings and subsampled individuals per population to subfolders | |
''' | |
oom = len(str(rand_samples)) | |
for i in range(rand_samples): | |
chosen_samples = None | |
curr_dir = path.join(dir_out, '{:0{}d}'.format(i+1, oom)) | |
try: | |
makedirs(curr_dir) | |
except FileExistsError as e: | |
pass | |
for fn in files_list: | |
out_fn = path.join(curr_dir, path.basename(fn)) | |
seqs, pops = parse_file(fn, ft) | |
if chosen_samples is None: | |
chosen_samples = [rand for pop in sorted(pops) for rand in random.sample(pops[pop], rand_indivs)] | |
# explode sequence dict for each chosen sample | |
sub_sample = [seq for sam in chosen_samples for seq in seqs[sam]] | |
SeqIO.write(sub_sample, out_fn, ft) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Nexus random sub-sampler') | |
parser.add_argument('-r', help='Number of sub-samples to generate [default: 10]', type=int, dest='num_samples', default=10) | |
parser.add_argument('-i', help='Number of individuals to keep per population [default: 1]', type=int, dest='num_inds', | |
default=1) | |
parser.add_argument('-o', help='Output base directory [default: ./sub_samples]', | |
type=str, dest='out_dir', default='./sub_samples') | |
parser.add_argument('-d', help='Input Directory with files [default: current directory]', type=str, dest='in_dir', default='.') | |
parser.add_argument('-t', help='File type. [default: nexus]', choices=['nexus', 'fasta'], type=str, dest='file_type', | |
default='nexus') | |
args = parser.parse_args() | |
if args.file_type == 'nexus': | |
exts = ['.nex'] | |
else: | |
exts = ['.fa', '.fasta'] | |
seq_files = [] | |
for e in exts: | |
for fi in glob(path.join(path.abspath(args.in_dir), '*' + e)): | |
seq_files.append(fi) | |
if len(seq_files) == 0: | |
parser.print_help() | |
print('\nError: Found no sequence files of the type specified\n') | |
sys.exit(1) | |
generate_random_samples(seq_files, path.abspath(args.out_dir), args.num_inds, args.num_samples, args.file_type) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment