Created
July 13, 2023 14:06
-
-
Save philippmuench/c8eaeb4c8d9880a8760173b17314e3c6 to your computer and use it in GitHub Desktop.
check for malformed file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import argparse | |
import random | |
def is_fasta(filename): | |
try: | |
with open(filename, 'r') as f: | |
first_line = f.readline().strip() | |
if not first_line: | |
return 'empty' | |
return 'malformed' if first_line[0] != '>' else 'valid' | |
except UnicodeDecodeError: | |
return 'binary' | |
except IndexError: | |
return 'empty' | |
def calculate_avg_file_size(directory, files): | |
sample_size = max(1, len(files) // 100) # 1% of files, but at least one file | |
sample_files = random.sample(files, sample_size) | |
total_size = sum(os.path.getsize(os.path.join(directory, file)) for file in sample_files) | |
return total_size / sample_size | |
def find_problematic_files(directory, big_files_path, empty_files_path, malformed_files_path): | |
fasta_files = [os.path.join(root, file) | |
for root, _, files in os.walk(directory) | |
for file in files if file.endswith('.fasta')] | |
avg_file_size = calculate_avg_file_size(directory, fasta_files) | |
max_file_size = 3 * avg_file_size # Any file larger than this is considered too big | |
print(f"Based on sampled {max(1, len(fasta_files) // 100)} files, the expected file size should be around {avg_file_size / (1024 * 1024):.2f} MB.") | |
print(f"Files larger than {max_file_size / (1024 * 1024):.2f} MB will be considered too big.") | |
big_files_count = 0 | |
empty_files_count = 0 | |
malformed_files_count = 0 | |
with open(big_files_path, 'w') as big_files, \ | |
open(empty_files_path, 'w') as empty_files, \ | |
open(malformed_files_path, 'w') as malformed_files: | |
for file in fasta_files: | |
file_path = os.path.join(directory, file) | |
file_size = os.path.getsize(file_path) | |
if file_size > max_file_size: | |
print(f"Too big file: {file_path} ({file_size / (1024 * 1024):.2f} MB)") | |
big_files.write(f"{file_path}\n") | |
big_files_count += 1 | |
continue | |
file_type = is_fasta(file_path) | |
if file_type != 'valid': | |
print(f"{file_type.capitalize()} file: {file_path}") | |
if file_type == 'empty': | |
empty_files.write(f"{file_path}\n") | |
empty_files_count += 1 | |
else: | |
malformed_files.write(f"{file_path}\n") | |
malformed_files_count += 1 | |
print(f"\nFound {big_files_count} too big files, {empty_files_count} empty files, and {malformed_files_count} malformed files.") | |
# Argument parser | |
parser = argparse.ArgumentParser(description="Find problematic .fasta files in a directory.") | |
parser.add_argument("--dir", help="Directory to search for .fasta files.", required=True) | |
parser.add_argument("--big_files", default="big_files.txt", help="Path to file for logging big files.") | |
parser.add_argument("--empty_files", default="empty_files.txt", help="Path to file for logging empty files.") | |
parser.add_argument("--malformed_files", default="malformed_files.txt", help="Path to file for logging malformed files.") | |
args = parser.parse_args() | |
find_problematic_files(args.dir, args.big_files, args.empty_files, args.malformed_files) | |
print(f"Big files written to: {args.big_files}") | |
print(f"Empty files written to: {args.empty_files}") | |
print(f"Malformed files written to: {args.malformed_files}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment