Created
October 14, 2024 12:21
-
-
Save philippmuench/5ef88b6ada178a000b9f76c160d9a743 to your computer and use it in GitHub Desktop.
download genomes from NCBI
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import hashlib | |
from pathlib import Path | |
import requests | |
import logging | |
from colorama import Fore, Style, init | |
import gzip | |
import shutil | |
import time | |
import random | |
import json | |
# Initialize colorama | |
init(autoreset=True) | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
def calculate_md5(file_path): | |
hash_md5 = hashlib.md5() | |
with open(file_path, "rb") as f: | |
for chunk in iter(lambda: f.read(4096), b""): | |
hash_md5.update(chunk) | |
return hash_md5.hexdigest() | |
def get_remote_checksum(https_url, filename): | |
checksum_url = f"{https_url}/uncompressed_checksums.txt" | |
logging.info(f"Downloading checksum file from: {checksum_url}") | |
try: | |
response = requests.get(checksum_url, timeout=30) | |
response.raise_for_status() | |
content = response.text | |
for line in content.splitlines(): | |
if filename in line: | |
return line.split()[1] # Return the MD5 checksum | |
logging.warning(f"No matching checksum found for {filename}") | |
return None | |
except requests.exceptions.RequestException as e: | |
logging.error(f"Error downloading checksum file: {str(e)}") | |
return None | |
def download_and_extract_fasta(https_url, filename, local_file_path): | |
fasta_url = f"{https_url}/{filename}.gz" | |
logging.info(f"Downloading FASTA file from: {fasta_url}") | |
try: | |
response = requests.get(fasta_url, timeout=30) | |
response.raise_for_status() | |
# Save the gzipped file | |
gzip_path = local_file_path.with_suffix('.fna.gz') | |
with open(gzip_path, 'wb') as f: | |
f.write(response.content) | |
# Extract the gzipped file | |
try: | |
with gzip.open(gzip_path, 'rb') as f_in: | |
with open(local_file_path, 'wb') as f_out: | |
shutil.copyfileobj(f_in, f_out) | |
except gzip.BadGzipFile as e: | |
logging.error(f"Error extracting gzip file: {str(e)}") | |
os.remove(gzip_path) | |
return False | |
# Remove the gzipped file | |
os.remove(gzip_path) | |
logging.info(f"Successfully downloaded and extracted {local_file_path}") | |
return True | |
except requests.exceptions.RequestException as e: | |
logging.error(f"Error downloading FASTA file: {str(e)}") | |
return False | |
def verify_and_redownload(file, https_path, remote_md5, max_attempts=3): | |
local_file_path = Path(file) | |
for attempt in range(max_attempts): | |
local_md5 = calculate_md5(local_file_path) | |
if local_md5 == remote_md5: | |
print(f"{Fore.GREEN}{file}: Checksum match{Style.RESET_ALL}") | |
return True | |
print(f"{Fore.RED}{file}: Checksum mismatch (Attempt {attempt + 1}/{max_attempts}){Style.RESET_ALL}") | |
print(f" Local MD5: {local_md5}") | |
print(f" Remote MD5: {remote_md5}") | |
if attempt < max_attempts - 1: | |
print(f"{Fore.YELLOW}Attempting to re-download the file...{Style.RESET_ALL}") | |
if os.path.exists(local_file_path): | |
os.remove(local_file_path) | |
if download_and_extract_fasta(https_path, local_file_path.stem, local_file_path): | |
time.sleep(random.uniform(1, 5)) # Wait for 1-5 seconds before next attempt | |
else: | |
print(f"{Fore.RED}Failed to re-download the file.{Style.RESET_ALL}") | |
return False | |
print(f"{Fore.RED}Failed to obtain a matching checksum after {max_attempts} attempts.{Style.RESET_ALL}") | |
return False | |
def main(): | |
print("Script started") | |
fasta_folder = Path("fasta") | |
fasta_folder.mkdir(exist_ok=True) | |
ftp_list_file = "LA_path_ftp_folder.txt" | |
skipped_files = [] | |
processed_files = [] | |
# Load previously processed files | |
processed_files_path = Path("processed_files.json") | |
if processed_files_path.exists(): | |
with open(processed_files_path, 'r') as f: | |
processed_files = json.load(f) | |
print(f"Loaded {len(processed_files)} previously processed files") | |
else: | |
print("No previously processed files found") | |
with open(ftp_list_file, 'r') as f: | |
ftp_paths = [line.strip() for line in f if line.strip()] | |
print(f"Found {len(ftp_paths)} FTP paths") | |
for ftp_path in ftp_paths: | |
https_path = f"https://{ftp_path}" | |
accession = ftp_path.split('/')[-1] | |
filename = f"{accession}_genomic.fna" | |
local_file_path = fasta_folder / filename | |
logging.info(f"Processing file: {filename}") | |
# Skip if file has already been processed successfully | |
if filename in processed_files: | |
print(f"{Fore.BLUE}{filename}: Already processed, skipping{Style.RESET_ALL}") | |
continue | |
remote_md5 = get_remote_checksum(https_path, filename) | |
if not remote_md5: | |
logging.warning(f"Remote checksum not found for {filename}") | |
print(f"{Fore.YELLOW}{filename}: Remote checksum not found{Style.RESET_ALL}") | |
skipped_files.append({"file": filename, "reason": "Remote checksum not found"}) | |
continue | |
if local_file_path.exists(): | |
print(f"{Fore.YELLOW}{filename}: File exists locally, verifying...{Style.RESET_ALL}") | |
if verify_and_redownload(local_file_path, https_path, remote_md5): | |
processed_files.append(filename) | |
print(f"{Fore.GREEN}{filename}: Verification successful, marked as processed{Style.RESET_ALL}") | |
else: | |
skipped_files.append({"file": filename, "reason": "Failed to verify"}) | |
else: | |
print(f"{Fore.YELLOW}{filename}: File does not exist locally, downloading...{Style.RESET_ALL}") | |
if download_and_extract_fasta(https_path, filename, local_file_path): | |
if verify_and_redownload(local_file_path, https_path, remote_md5): | |
processed_files.append(filename) | |
print(f"{Fore.GREEN}{filename}: Download and verification successful, marked as processed{Style.RESET_ALL}") | |
else: | |
skipped_files.append({"file": filename, "reason": "Failed to verify"}) | |
else: | |
skipped_files.append({"file": filename, "reason": "Failed to download"}) | |
# Save progress after each file | |
with open(processed_files_path, 'w') as f: | |
json.dump(processed_files, f, indent=2) | |
print(f"Progress saved: {len(processed_files)} files processed so far") | |
# Save skipped files information | |
if skipped_files: | |
with open("skipped_files.json", "w") as f: | |
json.dump(skipped_files, f, indent=2) | |
print(f"{Fore.YELLOW}Information about skipped files has been saved to skipped_files.json{Style.RESET_ALL}") | |
print(f"Script completed. Processed {len(processed_files)} files, skipped {len(skipped_files)} files.") | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
LA_path_ftp_folder.txt
should be in this format and should contain all files you want to download (one per line)