Created
January 9, 2025 13:25
-
-
Save thomastraum/df2a373d92d1618b848042e2b7a6e5ef to your computer and use it in GitHub Desktop.
backupemails.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# python backupemails.py imap.example.com username password /path/to/backup/directory | |
import imaplib | |
import email | |
import os | |
import sys | |
import mailbox | |
import datetime | |
from tqdm import tqdm # For progress bars | |
import socket | |
import time | |
def get_all_mailboxes(mail): | |
"""Returns a list of all mailboxes/folders in the account.""" | |
_, mailboxes = mail.list() | |
all_mailboxes = [] | |
print("Debug: Found raw mailboxes:") | |
for mailbox_info in mailboxes: | |
print(f"Raw mailbox info: {mailbox_info}") | |
for mailbox_info in mailboxes: | |
flags, delimiter, mailbox_name = parse_mailbox_info(mailbox_info) | |
if flags is None or mailbox_name is None: | |
continue | |
if b'\\Noselect' not in flags: # Skip folders that can't be selected | |
# Skip empty or dot-only mailbox names | |
if not mailbox_name or mailbox_name in ['.', '..']: | |
continue | |
# Add the mailbox name as-is | |
if mailbox_name == 'INBOX': | |
all_mailboxes.insert(0, mailbox_name) # Put INBOX first | |
else: | |
all_mailboxes.append(mailbox_name) | |
# Debug output | |
print("\nProcessed mailboxes:") | |
for mb in all_mailboxes: | |
print(f"- {mb}") | |
return all_mailboxes | |
def parse_mailbox_info(mailbox_info): | |
"""Parses the mailbox information returned by IMAP list command.""" | |
try: | |
# Get flags first | |
flags = mailbox_info.split(b' ')[0] | |
# Decode the full string | |
decoded = mailbox_info.decode('utf-8') | |
# Find the mailbox name - it's everything after the second quote | |
parts = decoded.split('"') | |
if len(parts) >= 4: # We expect at least 4 parts: before first quote, delimiter, after second quote | |
mailbox_name = parts[-1].strip(' ') # Take the last part and strip spaces | |
else: | |
# Fallback for simpler format | |
mailbox_name = parts[-1].strip(' ') | |
# Clean up the mailbox name | |
if mailbox_name.startswith('.'): | |
mailbox_name = mailbox_name[1:].strip() | |
return flags, ".", mailbox_name | |
except Exception as e: | |
print(f"Error parsing mailbox info: {e}") | |
return None, None, None | |
def reconnect_and_select(mail, imap_server, username, password, mailbox_name): | |
"""Reconnect to the server and select the mailbox.""" | |
try: | |
mail.logout() | |
except: | |
pass | |
mail = imaplib.IMAP4_SSL(imap_server) | |
mail.login(username, password) | |
# Try different selection methods | |
methods = [ | |
mailbox_name, # Try exact name | |
f'"{mailbox_name}"', # Try with quotes | |
mailbox_name.strip('"'), # Try without quotes | |
mailbox_name.replace(' ', '_') # Try with underscores | |
] | |
for method in methods: | |
try: | |
result = mail.select(method, readonly=True) | |
if result[0] == 'OK': | |
print(f"Successfully selected mailbox using: {method}") | |
return mail, True | |
print(f"Selection returned: {result} for method: {method}") | |
except Exception as e: | |
print(f"Selection method failed for {method}: {e}") | |
continue | |
return mail, False | |
def backup_emails_mbox(imap_server, username, password, output_dir, | |
socket_timeout=60, max_retries=3, retry_delay=5): | |
"""Backs up all mailboxes from an IMAP server to MBOX files.""" | |
# Set socket timeout | |
socket.setdefaulttimeout(socket_timeout) | |
try: | |
# Connect to the IMAP server | |
print(f"Connecting to {imap_server}...") | |
mail = imaplib.IMAP4_SSL(imap_server) | |
mail.login(username, password) | |
# Create output directory if it doesn't exist | |
os.makedirs(output_dir, exist_ok=True) | |
# Get list of all mailboxes | |
mailboxes = get_all_mailboxes(mail) | |
print(f"Found {len(mailboxes)} mailboxes to backup") | |
total_emails = 0 | |
for mailbox_name in mailboxes: | |
try: | |
print(f"\nProcessing mailbox: {mailbox_name}") | |
# Create sanitized filename for the mailbox | |
safe_filename = "".join(c if c.isalnum() else '_' for c in mailbox_name) | |
output_file = os.path.join(output_dir, f"{safe_filename}.mbox") | |
# Skip if file already exists | |
if os.path.exists(output_file): | |
print(f"Skipping {mailbox_name} - backup file already exists") | |
continue | |
# Reconnect and select mailbox | |
mail, selected = reconnect_and_select(mail, imap_server, username, password, mailbox_name) | |
if not selected: | |
print(f"Could not select mailbox {mailbox_name}, skipping...") | |
continue | |
# Get email count | |
_, data = mail.search(None, "ALL") | |
if not data or not data[0]: | |
print(f"No emails found in {mailbox_name}") | |
continue | |
email_ids = data[0].split() | |
print(f"Found {len(email_ids)} emails in {mailbox_name}") | |
# Create mbox file | |
mbox = mailbox.mbox(output_file) | |
try: | |
mbox.lock() | |
# Process emails with progress bar | |
for num in tqdm(email_ids, desc=f"Backing up {mailbox_name}"): | |
retry_count = 0 | |
while retry_count < max_retries: | |
try: | |
_, data = mail.fetch(num, '(RFC822)') | |
if not data or data[0] is None: | |
print(f"\nCould not fetch email {num}, skipping...") | |
break | |
email_data = data[0][1] | |
if email_data: | |
msg = email.message_from_bytes(email_data) | |
mbox.add(msg) | |
total_emails += 1 | |
break # Success, exit retry loop | |
except (socket.timeout, imaplib.IMAP4.abort, imaplib.IMAP4.error) as e: | |
retry_count += 1 | |
if retry_count == max_retries: | |
print(f"\nFailed to process email {num} after {max_retries} retries: {e}") | |
break | |
print(f"\nConnection error, retrying ({retry_count}/{max_retries})...") | |
# Reconnect to server | |
mail, selected = reconnect_and_select(mail, imap_server, username, password, mailbox_name) | |
if not selected: | |
print("Could not reselect mailbox, skipping remaining emails...") | |
break | |
time.sleep(retry_delay) | |
except Exception as e: | |
print(f"\nError processing email {num}: {e}") | |
break | |
finally: | |
# Always try to properly close the mbox | |
try: | |
mbox.flush() | |
mbox.unlock() | |
mbox.close() | |
except Exception as e: | |
print(f"Error closing mbox file: {e}") | |
except Exception as e: | |
print(f"Error processing mailbox {mailbox_name}: {e}") | |
continue | |
try: | |
mail.logout() | |
except: | |
pass | |
print(f"\nBackup completed successfully!") | |
print(f"Total emails backed up: {total_emails}") | |
print(f"Backup files saved in: {output_dir}") | |
except imaplib.IMAP4.error as e: | |
print(f"IMAP error: {e}") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
raise | |
if __name__ == "__main__": | |
if len(sys.argv) != 5: | |
print("Usage: python backup_script.py <imap_server> <username> <password> <output_directory>") | |
sys.exit(1) | |
imap_server = sys.argv[1] | |
username = sys.argv[2] | |
password = sys.argv[3] | |
output_dir = sys.argv[4] | |
backup_emails_mbox(imap_server, username, password, output_dir) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment