Created
September 17, 2023 19:09
-
-
Save mdbecker/8c07de78e7ed815d0a18dc99f81fd691 to your computer and use it in GitHub Desktop.
Script to parse .mbox email files and export them into monthly-separated CSV files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mailbox | |
import csv | |
import email.utils | |
from collections import defaultdict | |
import argparse | |
from pathlib import Path | |
from bs4 import BeautifulSoup | |
from tqdm import tqdm | |
# Constants for field names | |
DATE_FIELD = "Date" | |
FROM_FIELD = "From" | |
TO_FIELD = "To" | |
CC_FIELD = "Cc" | |
BCC_FIELD = "Bcc" | |
SUBJECT_FIELD = "Subject" | |
MSGID_FIELD = "Message-ID" | |
IN_REPLY_TO_FIELD = "In-Reply-To" | |
REFERENCES_FIELD = "References" | |
# Constants for error messages | |
ERROR_MSG_DATE = "Error parsing date" | |
ERROR_MSG_FROM = "Error parsing sender" | |
ERROR_MSG_TO = "Error parsing recipients" | |
ERROR_MSG_CC = "Error parsing CC" | |
ERROR_MSG_BCC = "Error parsing BCC" | |
ERROR_MSG_BODY = "Error parsing body" | |
def write_to_csv(data, filename): | |
try: | |
with open(filename, 'w', newline='') as csvfile: | |
writer = csv.writer(csvfile) | |
writer.writerow(["Contact Time", "Recipient Name", "Recipient Email", "Recipient Domain", "Sender Email", "CC", "BCC", "Subject", "Body", MSGID_FIELD, IN_REPLY_TO_FIELD, REFERENCES_FIELD]) | |
for row in data: | |
writer.writerow(row) | |
except Exception as e: | |
print(f"Error writing file {filename}: {e}") | |
def safe_parse(field, parsing_function, error_message): | |
try: | |
return parsing_function(field) | |
except Exception as e: | |
print(f"{error_message}: {e}") | |
return None | |
def get_body(message): | |
if message.is_multipart(): | |
for part in message.walk(): | |
content_type = part.get_content_type() | |
if content_type == 'text/plain': | |
return part.get_payload() | |
elif content_type == 'text/html': | |
try: | |
html_body = part.get_payload() | |
soup = BeautifulSoup(html_body, 'html.parser') | |
return soup.get_text() | |
except Exception as e: | |
print(f"{ERROR_MSG_BODY}: {e}") | |
return None | |
else: | |
return message.get_payload() | |
def parse_email(message): | |
date = safe_parse(message[DATE_FIELD], email.utils.parsedate_to_datetime, ERROR_MSG_DATE) | |
sender = safe_parse(message[FROM_FIELD], lambda x: email.utils.parseaddr(x)[1], ERROR_MSG_FROM) | |
recipients = safe_parse(message[TO_FIELD], lambda x: email.utils.getaddresses([x]), ERROR_MSG_TO) | |
cc = safe_parse(message[CC_FIELD], lambda x: email.utils.getaddresses([x]), ERROR_MSG_CC) | |
bcc = safe_parse(message[BCC_FIELD], lambda x: email.utils.getaddresses([x]), ERROR_MSG_BCC) | |
subject = message[SUBJECT_FIELD] | |
body = get_body(message) | |
msg_id = message[MSGID_FIELD] | |
in_reply_to = message[IN_REPLY_TO_FIELD] | |
references = message[REFERENCES_FIELD] | |
return date, sender, recipients, cc, bcc, subject, body, msg_id, in_reply_to, references | |
def process_mbox(mbox_filename): | |
try: | |
mbox = mailbox.mbox(mbox_filename) | |
except Exception as e: | |
print(f"Error reading mbox file {mbox_filename}: {e}") | |
return | |
monthly_data = defaultdict(list) | |
for message in tqdm(mbox): | |
date, sender, recipients, cc, bcc, subject, body, msg_id, in_reply_to, references = parse_email(message) | |
for name, email in recipients: | |
domain = email.split('@')[-1] if '@' in email else '' | |
monthly_data[date.strftime("%Y-%m") if date else None].append((date.strftime("%Y-%m-%d %H:%M:%S") if date else None, name, email, domain, sender, cc, bcc, subject, body, msg_id, in_reply_to, references)) | |
for month, data in monthly_data.items(): | |
write_to_csv(data, f"sent_mail/{month}.csv") | |
def main(): | |
parser = argparse.ArgumentParser(description='Process mbox files into monthly CSVs.') | |
parser.add_argument('mbox_path', type=str, help='The path to the mbox file') | |
args = parser.parse_args() | |
process_mbox(Path(args.mbox_path)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment