Created
October 26, 2024 20:49
-
-
Save BharatKalluri/4ab94498d1d10a3c8ec2e6971efa6054 to your computer and use it in GitHub Desktop.
Mail sanitizer - no deps
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import imaplib | |
import email | |
from collections import Counter | |
import re | |
import webbrowser | |
from email.utils import parseaddr | |
import logging | |
# Constants | |
MAX_EMAILS_TO_PROCESS = 1000 | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(message)s', | |
datefmt='%H:%M:%S' | |
) | |
class GmailAnalyzer: | |
def __init__(self, email_address, app_password): | |
self.email_address = email_address | |
self.app_password = app_password | |
def connect(self): | |
"""Create a fresh IMAP connection""" | |
mail = imaplib.IMAP4_SSL("imap.gmail.com") | |
mail.login(self.email_address, self.app_password) | |
return mail | |
def get_sender_statistics(self): | |
"""Analyze recent emails to get sender frequencies""" | |
mail = self.connect() | |
mail.select('INBOX') | |
_, messages = mail.search(None, 'ALL') | |
message_ids = messages[0].split() | |
message_ids = message_ids[-MAX_EMAILS_TO_PROCESS:] if len(message_ids) > MAX_EMAILS_TO_PROCESS else message_ids | |
sender_counts = Counter() | |
sender_emails = {} | |
total_messages = len(message_ids) | |
logging.info(f"Processing {total_messages} emails...") | |
for i, msg_num in enumerate(message_ids, 1): | |
if i % 10 == 0: | |
logging.info(f"Processed {i}/{total_messages} emails...") | |
_, msg_data = mail.fetch(msg_num, '(RFC822)') | |
email_message = email.message_from_bytes(msg_data[0][1]) | |
sender = email_message['from'] | |
sender_name, sender_addr = parseaddr(sender) | |
if sender_addr: | |
sender_counts[sender_addr] += 1 | |
sender_emails[sender_addr] = { | |
'msg_num': msg_num, | |
'name': sender_name, | |
'raw_data': msg_data[0][1] | |
} | |
logging.info("Email analysis complete!") | |
mail.logout() | |
return sender_counts.most_common(), sender_emails | |
def get_unsubscribe_link(self, raw_email_data): | |
"""Extract unsubscribe link from email data""" | |
email_message = email.message_from_bytes(raw_email_data) | |
list_unsubscribe = email_message.get('List-Unsubscribe') | |
if list_unsubscribe: | |
urls = re.findall(r'https?://[^\s<>"]+|www\.[^\s<>"]+', list_unsubscribe) | |
if urls: | |
return urls[0] | |
for part in email_message.walk(): | |
if part.get_content_type() == "text/html": | |
html_body = part.get_payload(decode=True).decode() | |
unsubscribe_patterns = [ | |
r'https?://[^\s<>"]+(?:unsubscribe|opt[_-]out)[^\s<>"]*', | |
r'https?://[^\s<>"]+(?:click\.notification)[^\s<>"]*', | |
] | |
for pattern in unsubscribe_patterns: | |
matches = re.findall(pattern, html_body, re.IGNORECASE) | |
if matches: | |
return matches[0] | |
return None | |
def delete_emails_from_sender(self, sender_email): | |
"""Delete all emails from a specific sender using batch operations""" | |
mail = self.connect() | |
logging.info(f"Starting deletion of emails from {sender_email}...") | |
mail.select('INBOX') | |
# Use UID SEARCH for more reliable message identification | |
_, messages = mail.uid('SEARCH', None, f'FROM "{sender_email}"') | |
if not messages[0]: | |
logging.info("No messages found to delete") | |
mail.logout() | |
return | |
# Convert the space-separated string of UIDs into a list | |
message_uids = messages[0].split() | |
# Batch delete all messages at once | |
uids_str = b','.join(message_uids) | |
mail.uid('STORE', uids_str, '+FLAGS', r'(\Deleted)') # Note the r prefix for raw string | |
mail.expunge() | |
logging.info(f"Successfully deleted {len(message_uids)} emails from {sender_email}") | |
mail.logout() | |
def main(): | |
email_address = input("Enter your Gmail address: ") | |
app_password = input("Enter your Gmail App Password: ") | |
logging.info("Connecting to Gmail...") | |
analyzer = GmailAnalyzer(email_address, app_password) | |
# Test connection | |
test_conn = analyzer.connect() | |
test_conn.logout() | |
logging.info(f"Analyzing your {MAX_EMAILS_TO_PROCESS} most recent emails...") | |
sender_stats, sender_emails = analyzer.get_sender_statistics() | |
print("\nMost frequent senders:") | |
for i, (sender, count) in enumerate(sender_stats, 1): | |
sender_info = sender_emails.get(sender, {}) | |
sender_name = sender_info.get('name', 'Unknown') | |
print(f"\n{i}. {sender_name} <{sender}>: {count} emails") | |
unsubscribe_link = analyzer.get_unsubscribe_link(sender_info['raw_data']) | |
if unsubscribe_link: | |
print(f" Unsubscribe link found: {unsubscribe_link[:50]} ...") | |
else: | |
print(" No unsubscribe link found") | |
choice = input("Delete all emails from this sender? (Y/n): ") | |
if choice.lower() == 'y' or choice.lower() == '': | |
if unsubscribe_link: | |
logging.info("Opening unsubscribe link in browser...") | |
webbrowser.open(unsubscribe_link) | |
analyzer.delete_emails_from_sender(sender) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment