Created
September 30, 2025 11:04
-
-
Save theosanderson/839ad884c16323943471c99948eec749 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import subprocess | |
| import tempfile | |
| import gzip | |
| from pathlib import Path | |
| from Bio import SeqIO | |
| def read_fasta_headers(fasta_file): | |
| headers = [] | |
| with gzip.open(fasta_file, 'rt') as f: | |
| for record in SeqIO.parse(f, 'fasta'): | |
| headers.append(record.id) | |
| return headers | |
| def read_fasta_batches(fasta_file, batch_size=1000): | |
| batch = [] | |
| with gzip.open(fasta_file, 'rt') as f: | |
| for record in SeqIO.parse(f, 'fasta'): | |
| batch.append(record) | |
| if len(batch) >= batch_size: | |
| yield batch | |
| batch = [] | |
| if batch: | |
| yield batch | |
| def write_batch_to_fasta(batch, output_file): | |
| with open(output_file, 'w') as f: | |
| SeqIO.write(batch, f, 'fasta') | |
| def run_nextclade(batch, output_tsv): | |
| tool_command = [ | |
| "yourtool", | |
| "--input", "{input}", | |
| "-o", "{output}", | |
| ] | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.fasta', delete=False) as tmp_fasta: | |
| fasta_path = tmp_fasta.name | |
| write_batch_to_fasta(batch, fasta_path) | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.tsv', delete=False) as tmp_tsv: | |
| tsv_path = tmp_tsv.name | |
| try: | |
| cmd = [arg.replace('{input}', fasta_path).replace('{output}', tsv_path) | |
| for arg in tool_command] | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| check=True | |
| ) | |
| # Append the TSV results to the output file | |
| with open(tsv_path, 'r') as tsv_in: | |
| with open(output_tsv, 'a') as tsv_out: | |
| tsv_out.write(tsv_in.read()) | |
| return True | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error processing batch: {e}") | |
| print(f"stderr: {e.stderr}") | |
| return False | |
| finally: | |
| Path(fasta_path).unlink(missing_ok=True) | |
| Path(tsv_path).unlink(missing_ok=True) | |
| def main(): | |
| input_fasta = "sequences.fasta.gz" | |
| output_tsv = "all_results.tsv" | |
| batch_size = 1000 | |
| # Clear/create output file | |
| open(output_tsv, 'w').close() | |
| print("Reading FASTA file headers...") | |
| headers = read_fasta_headers(input_fasta) | |
| total_sequences = len(headers) | |
| total_batches = (total_sequences + batch_size - 1) // batch_size | |
| print(f"Found {total_sequences} sequences") | |
| print(f"Will process in {total_batches} batches of {batch_size}") | |
| print(f"\nHeaders preview (first 5):") | |
| for i, header in enumerate(headers[:5], 1): | |
| print(f" {i}. {header}") | |
| if total_sequences > 5: | |
| print(f" ... and {total_sequences - 5} more") | |
| print() | |
| batch_number = 0 | |
| sequences_processed = 0 | |
| for batch in read_fasta_batches(input_fasta, batch_size): | |
| batch_number += 1 | |
| num_seqs = len(batch) | |
| sequences_processed += num_seqs | |
| progress = (sequences_processed / total_sequences) * 100 | |
| print(f"Processing batch {batch_number}/{total_batches} ({num_seqs} sequences) - {progress:.1f}% complete") | |
| success = run_nextclade(batch, output_tsv) | |
| if success: | |
| print(f" ✓ Batch {batch_number} complete") | |
| else: | |
| print(f" ✗ Batch {batch_number} failed") | |
| print(f"\n{'='*60}") | |
| print(f"Finished processing {sequences_processed} sequences in {batch_number} batches") | |
| print(f"Results written to: {output_tsv}") | |
| print(f"{'='*60}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment