Created
March 13, 2024 01:53
-
-
Save pszemraj/9f57fd3fbe61165f1d3edd69f7550d69 to your computer and use it in GitHub Desktop.
parse directory of .eml files to a text dataframe, save to parquet
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from email.parser import BytesParser | |
from pathlib import Path | |
import fire | |
import html2text | |
import pandas as pd | |
from tqdm import tqdm | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") | |
def extract_email_data(file_path): | |
"""Extract email data from a file. | |
This function reads an email file, parses its content and returns a | |
dictionary with the following keys: | |
* date: email date | |
* from_name: sender name | |
* from_address: sender email address | |
* to: recipients | |
* subject: email subject | |
* cc: carbon copy recipients | |
* bcc: blind carbon copy recipients | |
* message: email body | |
Parameters | |
---------- | |
file_path : str | |
Path to the email file to parse. | |
Returns | |
------- | |
email_data : dict | |
Dictionary with email data. | |
""" | |
with open(file_path, "rb") as f: | |
msg = BytesParser().parse(f) | |
# Extract email body | |
body = "" | |
if msg.is_multipart(): | |
# Walk through all parts of a multipart email | |
for part in msg.walk(): | |
# Get the content type of the current part | |
ctype = part.get_content_type() | |
# Get the content disposition of the current part | |
cdispo = str(part.get("Content-Disposition")) | |
# Check if the part is a text/plain and not an attachment | |
if ctype == "text/plain" and "attachment" not in cdispo: | |
# If it is, extract the payload (the message body) | |
body = part.get_payload(decode=True) # decode | |
break | |
# Otherwise, if the part is a text/html and not an attachment | |
elif ctype == "text/html" and "attachment" not in cdispo: | |
# Extract the HTML payload | |
html_body = part.get_payload(decode=True) | |
# Convert the HTML body to plain text | |
body = html2text.html2text(html_body.decode("utf-8", errors="ignore")) | |
break | |
else: | |
# If the email is not multipart, extract the payload | |
ctype = msg.get_content_type() | |
# Check if the content type is text/plain | |
if ctype == "text/plain": | |
# If it is, extract the payload (the message body) | |
body = msg.get_payload(decode=True) | |
# Otherwise, if the content type is text/html | |
elif ctype == "text/html": | |
# Extract the HTML payload | |
html_body = msg.get_payload(decode=True) | |
# Convert the HTML body to plain text | |
body = html2text.html2text(html_body.decode("utf-8", errors="ignore")) | |
# Decode the body if it is bytes | |
body = body.decode("utf-8", errors="ignore") if isinstance(body, bytes) else body | |
# Return a dictionary with the email data | |
return { | |
"date": msg["Date"], | |
"from_name": msg["From"].split("<")[0].strip(), | |
"from_address": msg["From"].split("<")[-1].replace(">", "").strip(), | |
"to": msg["To"], | |
"subject": msg["Subject"], | |
"cc": msg.get("Cc", ""), | |
"bcc": msg.get("Bcc", ""), | |
"message": body, | |
} | |
def process_directory(input_dir, output_dir=None): | |
"""Process all .eml files in the given directory.""" | |
input_path = Path(input_dir) | |
if not output_dir: | |
output_dir = input_path / "output" | |
output_dir = Path(output_dir) | |
output_dir.mkdir(exist_ok=True, parents=True) | |
logging.info(f"Processing emails in {input_dir}") | |
email_files = list(input_path.glob("*.eml")) | |
data = [] | |
for file in tqdm(email_files, desc="Processing"): | |
try: | |
email_data = extract_email_data(file) | |
data.append(email_data) | |
except Exception as e: | |
logging.error(f"Failed to process {file.name}: {e}") | |
df = pd.DataFrame(data).convert_dtypes() | |
output_file = output_dir / "emails.parquet" | |
df.to_parquet(output_file, index=False) | |
logging.info(df.info()) | |
logging.info(f"Data saved to {output_file}") | |
def main(input_dir, output_dir=None): | |
"""Main function to process emails and save to a Parquet file.""" | |
process_directory(input_dir, output_dir) | |
if __name__ == "__main__": | |
fire.Fire(main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment