Forked from benwattsjones/gmail_mbox_parser.py
Last active
September 29, 2023 18:27
-
-
Save AlbertoEAF/0ef8251d85d405237dd71b7598102ed9 to your computer and use it in GitHub Desktop.
Extension of gmail_mbox_parser.py to parse GMail's Google Takeout exports in .mbox format and do clustering analysis of senders so you can quickly triage which kind of information sources might no longer be relevant. It also adds command line arguments and exports the sender's statistics in a .csv file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
# ~*~ utf-8 ~*~ | |
# About: Extension of gmail_mbox_parser.py to parse GMail's Google Takeout exports in .mbox format and do clustering analysis of senders so you can quickly triage which kind of information sources might no longer be relevant. It also adds command line arguments and exports the sender's statistics in a .csv file. | |
# Based on https://gist.github.com/benwattsjones/060ad83efd2b3afc8b229d41f9b246c4 but expanded to add command line arguments, do clustering of senders and export a .csv with those statistics. | |
import re | |
import argparse | |
import mailbox | |
from collections import Counter | |
import pandas as pd | |
import csv | |
import bs4 | |
def parse_sender(sender): | |
try: | |
if "<" in sender: | |
sender_name, sender_email = re.match( | |
"^([^<]*)<([^>]+)>.*$", sender | |
).groups() | |
if not sender_name: | |
sender_name = sender_email | |
else: | |
sender_name = sender_email = sender | |
except Exception as e: | |
raise ValueError(f"Couldn't parse sender: '{sender}'. BaseException=<{e}>") | |
return { | |
"sender": sender, | |
"sender_name": sender_name.strip(), | |
"sender_email": sender_email, | |
} | |
def get_html_text(html): | |
try: | |
return bs4.BeautifulSoup(html, "lxml").body.get_text(" ", strip=True) | |
except AttributeError: # message contents empty | |
return None | |
class GmailMboxMessage: | |
def __init__(self, email_data): | |
if not isinstance(email_data, mailbox.mboxMessage): | |
raise TypeError("Variable must be type mailbox.mboxMessage") | |
self.email_data = email_data | |
self.email_labels = self.email_data["X-Gmail-Labels"] | |
self.email_date = self.email_data["Date"] | |
self.email_from = str(self.email_data["From"]) | |
self.email_to = str(self.email_data["To"]) | |
self.email_subject = self.email_data["Subject"] | |
def parse_email(self): | |
self.email_text = self.read_email_payload() | |
def read_email_payload(self): | |
email_payload = self.email_data.get_payload() | |
if self.email_data.is_multipart(): | |
email_messages = list(self._get_email_messages(email_payload)) | |
else: | |
email_messages = [email_payload] | |
return [self._read_email_text(msg) for msg in email_messages] | |
def _get_email_messages(self, email_payload): | |
for msg in email_payload: | |
if isinstance(msg, (list, tuple)): | |
for submsg in self._get_email_messages(msg): | |
yield submsg | |
elif msg.is_multipart(): | |
for submsg in self._get_email_messages(msg.get_payload()): | |
yield submsg | |
else: | |
yield msg | |
def _read_email_text(self, msg): | |
content_type = "NA" if isinstance(msg, str) else msg.get_content_type() | |
encoding = ( | |
"NA" if isinstance(msg, str) else msg.get("Content-Transfer-Encoding", "NA") | |
) | |
if "text/plain" in content_type and "base64" not in encoding: | |
msg_text = msg.get_payload() | |
elif "text/html" in content_type and "base64" not in encoding: | |
msg_text = get_html_text(msg.get_payload()) | |
elif content_type == "NA": | |
msg_text = get_html_text(msg) | |
else: | |
msg_text = None | |
return (content_type, encoding, msg_text) | |
######################### End of library, example of use below | |
def parse_args(): | |
parser = argparse.ArgumentParser("GMail Inbox MBox parser") | |
parser.add_argument("-i", "--input", required=True) | |
parser.add_argument( | |
"--exclude-senders", | |
default=[], | |
nargs="*", | |
help="Optional. If passed, excludes such e-mails with such sender(s) from the analysis.", | |
) | |
parser.add_argument( | |
"--output-senders", default="_generated_mbox_senders_summary.csv" | |
) | |
return parser.parse_args() | |
def read_mailbox_mails(mbox, exclude_sender_patterns=[]): | |
def sender_in_exclude_senders(sender): | |
for sender_pattern in exclude_sender_patterns: | |
if sender_pattern in sender: | |
return True | |
return False | |
num_entries = len(mbox) | |
emails = [] | |
for idx, email_obj in enumerate(mbox): | |
if idx % 100 == 0: | |
print(f"Parsing email {idx} of {num_entries}...".format(idx, num_entries)) | |
email = GmailMboxMessage(email_obj) | |
if sender_in_exclude_senders(email.email_from): | |
continue | |
emails.append(email) | |
return emails | |
def with_col_count(df, column): | |
"""Adds a column named""" | |
output_column = f"{column}_count" | |
return df.merge( | |
df.groupby(column).size().rename(output_column), | |
left_on=column, | |
right_index=True, | |
) | |
def senders_summary(emails): | |
df = pd.DataFrame(parse_sender(email.email_from) for email in emails) | |
df = with_col_count(df, "sender_email") | |
df = with_col_count(df, "sender_name") | |
df = with_col_count(df, "sender") | |
df.drop_duplicates(inplace=True) | |
df.sort_values( | |
by=["sender_email_count", "sender_name_count"], ascending=False, inplace=True | |
) | |
return df | |
if __name__ == "__main__": | |
args = parse_args() | |
print(f"Reading mbox file '{args.input}'...") | |
mbox = mailbox.mbox(args.input) | |
print() | |
emails = read_mailbox_mails(mbox, exclude_sender_patterns=args.exclude_senders) | |
senders_summary_df = senders_summary(emails) | |
senders_summary_df.to_csv(args.output_senders, index=False, quoting=csv.QUOTE_ALL) | |
print("Email senders:") | |
print(senders_summary_df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment