Last active
August 20, 2022 08:26
-
-
Save terrymun/1ecb80f22afb9a6e6600d5355b80351d to your computer and use it in GitHub Desktop.
Convert multi-sequence FASTA file into individual files containing n number of FASTA entries
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Load libraries | |
import os, sys, time, re | |
from Bio import SeqIO | |
from itertools import zip_longest | |
# File path | |
currentDir = os.path.dirname(os.path.realpath(__file__)) | |
# Configuration | |
minSeqLength = 80 # Do not include sequences shorter than this | |
n = 500 # Batch size | |
# Start reading/writing | |
with \ | |
open(currentDir + '/badRecords.txt', 'w') as badRecordsFile, \ | |
open(currentDir + '/correctedRecords.txt', 'w') as correctedRecordsFile, \ | |
open(currentDir + '/shortRecords.txt', 'w') as shortRecordsFile, \ | |
open(currentDir + '/filtered.fa', 'w') as filteredFasta, \ | |
open(currentDir + '/20130521_Lj30_proteins.fa', 'r') as fasta: | |
count = 0 | |
records = {'corrected': [], 'bad': [], 'tooShort': []} | |
# Make output directory if not present | |
if not os.path.exists(currentDir + '/seqs'): | |
os.makedirs(currentDir + '/seqs') | |
# Iterate through all records | |
for record in SeqIO.parse(fasta, 'fasta'): | |
count += 1 | |
# Only process if sequence is longer than 80 amino acids | |
if len(str(record.seq)) >= minSeqLength: | |
if re.match(r'(.*)\*$', str(record.seq)): | |
records['corrected'].append(record.id) | |
correctedRecordsFile.write(record.id+'\n') | |
sequence = re.sub(r'(.*)\*$', r'\1', str(record.seq)) | |
if '*' in sequence: | |
records['bad'].append(record.id) | |
badRecordsFile.write(record.id+'\n') | |
print(str(record.id) + ' Warning: Entry contains non-trailing stop codon.') | |
else: | |
filteredFasta.write('>'+str(record.id)+'\n') | |
filteredFasta.write(sequence+'\n') | |
if count%1000 == 0: | |
print('Records processed: '+str(count)) | |
else: | |
records['tooShort'].append(record.id) | |
shortRecordsFile.write(record.id+'\n') | |
print(str(record.id) + ' Warning: Entry is shorter than minimum sequence length of '+str(minSeqLength)) | |
print('====================================') | |
print('End of file') | |
print('====================================') | |
print('Total records processed: '+str(count)) | |
print('Corrected records (all is good): '+str(len(records['corrected']))) | |
print('Problematic records (requires attention): '+str(len(records['bad']))) | |
print('Short records (not written): '+str(len(records['tooShort']))) | |
print('====================================') | |
# Create FASTA in batches | |
def grouper(n, iterable, padvalue=None): | |
return zip_longest(*[iter(iterable)]*n, fillvalue=padvalue) | |
with \ | |
open(currentDir + '/filtered.fa', 'r') as filteredFasta: | |
# Make output directory if not present | |
if not os.path.exists(currentDir + '/seqs_batch'): | |
os.makedirs(currentDir + '/seqs_batch') | |
# Use n*2 because each FASTA entry occupies two lines (header + sequence) | |
for i, g in enumerate(grouper(n*2, filteredFasta, padvalue=''), 1): | |
with open(currentDir + '/seqs_batch/filtered_batch' + str(i) + '.fa', 'w') as batchFasta: | |
batchFasta.writelines(g) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment