Created
February 28, 2025 12:29
-
-
Save ecoopnet/eff110c30a085230f0b99d80c239c3af to your computer and use it in GitHub Desktop.
メールボックスファイル(*.mbox) をパースしてファイル展開するスクリプト
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import mailbox | |
import os | |
import email | |
import datetime | |
import re | |
import sys | |
from email.header import decode_header | |
def decode_str(s): | |
""" | |
メールヘッダーをデコードする関数 | |
""" | |
if s is None: | |
return "" | |
decoded_parts = [] | |
for part, encoding in decode_header(s): | |
if isinstance(part, bytes): | |
if encoding: | |
try: | |
decoded_parts.append(part.decode(encoding)) | |
except: | |
# エンコーディングが失敗した場合、UTF-8で試行 | |
try: | |
decoded_parts.append(part.decode('utf-8')) | |
except: | |
# それでも失敗した場合、エラー回避モードで | |
decoded_parts.append(part.decode('utf-8', errors='replace')) | |
else: | |
try: | |
decoded_parts.append(part.decode('utf-8')) | |
except: | |
decoded_parts.append(part.decode('utf-8', errors='replace')) | |
else: | |
decoded_parts.append(part) | |
return ''.join(decoded_parts) | |
def sanitize_filename(filename): | |
""" | |
ファイル名に使えない文字を置換する関数 | |
""" | |
# ファイル名に使えない文字を置換 | |
invalid_chars = r'[\\/*?:"<>|]' | |
return re.sub(invalid_chars, '_', filename) | |
def extract_date(msg): | |
""" | |
メッセージから日付情報を抽出し、YYMMdd_hhmmss形式に変換する関数 | |
""" | |
date_str = msg.get('Date') | |
if not date_str: | |
# 日付がない場合は現在時刻を使用 | |
now = datetime.datetime.now() | |
return now.strftime("%y%m%d_%H%M%S") | |
try: | |
# 様々な日付形式に対応するために複数のパターンを試行 | |
date_formats = [ | |
'%a, %d %b %Y %H:%M:%S %z', | |
'%a, %d %b %Y %H:%M:%S %Z', | |
'%d %b %Y %H:%M:%S %z', | |
'%a, %d %b %Y %H:%M:%S', | |
'%d %b %Y %H:%M:%S' | |
] | |
# 余分な情報(括弧内の情報など)を削除 | |
date_str = re.sub(r'\([^)]*\)', '', date_str).strip() | |
parsed_date = None | |
for fmt in date_formats: | |
try: | |
parsed_date = datetime.datetime.strptime(date_str, fmt) | |
break | |
except ValueError: | |
continue | |
if parsed_date: | |
return parsed_date.strftime("%y%m%d_%H%M%S") | |
else: | |
# どの形式でもパースできない場合はemail.utils.parsedate_tzを使用 | |
from email.utils import parsedate_to_datetime | |
try: | |
parsed_date = parsedate_to_datetime(date_str) | |
return parsed_date.strftime("%y%m%d_%H%M%S") | |
except: | |
# それでも失敗した場合は現在時刻を使用 | |
now = datetime.datetime.now() | |
return now.strftime("%y%m%d_%H%M%S") | |
except: | |
# エラーが発生した場合は現在時刻を使用 | |
now = datetime.datetime.now() | |
return now.strftime("%y%m%d_%H%M%S") | |
def process_mbox(mbox_path, output_dir): | |
""" | |
mboxファイルを処理して、メール内容と添付ファイルを保存する関数 | |
""" | |
# 出力ディレクトリの作成 | |
os.makedirs(output_dir, exist_ok=True) | |
# mboxファイルのオープン | |
mbox = mailbox.mbox(mbox_path) | |
print(f"処理を開始: {mbox_path}") | |
print(f"メール総数: {len(mbox)}") | |
# 各メッセージを処理 | |
for i, msg in enumerate(mbox): | |
try: | |
# 日付の抽出 | |
date_prefix = extract_date(msg) | |
# 件名の抽出 | |
subject = decode_str(msg.get('Subject', 'No Subject')) | |
print(f"処理中 [{i+1}/{len(mbox)}]: 日付={date_prefix}, 件名={subject}") | |
# ファイル名の作成と無効な文字の置換 | |
base_filename = sanitize_filename(f"{date_prefix}_{subject}") | |
# メール本文の保存 | |
content = "" | |
content += f"From: {decode_str(msg.get('From', 'Unknown'))}\n" | |
content += f"To: {decode_str(msg.get('To', 'Unknown'))}\n" | |
content += f"Subject: {subject}\n" | |
content += f"Date: {msg.get('Date', 'Unknown')}\n" | |
content += "-" * 40 + "\n\n" | |
# メール本文の抽出 | |
if msg.is_multipart(): | |
for part in msg.walk(): | |
content_type = part.get_content_type() | |
content_disposition = str(part.get("Content-Disposition")) | |
# 本文テキストの抽出 | |
if content_type == "text/plain" and "attachment" not in content_disposition: | |
try: | |
body = part.get_payload(decode=True) | |
charset = part.get_content_charset() | |
if charset: | |
body = body.decode(charset, errors='replace') | |
else: | |
body = body.decode('utf-8', errors='replace') | |
content += body + "\n" | |
except Exception as e: | |
content += f"[本文のデコードに失敗: {str(e)}]\n" | |
else: | |
# 非マルチパートメッセージの処理 | |
try: | |
body = msg.get_payload(decode=True) | |
charset = msg.get_content_charset() | |
if charset: | |
body = body.decode(charset, errors='replace') | |
else: | |
body = body.decode('utf-8', errors='replace') | |
content += body + "\n" | |
except Exception as e: | |
content += f"[本文のデコードに失敗: {str(e)}]\n" | |
# メール本文をファイルに保存 | |
output_file = os.path.join(output_dir, f"{base_filename}.txt") | |
with open(output_file, 'w', encoding='utf-8') as f: | |
f.write(content) | |
print(f" メール本文を保存: {os.path.basename(output_file)}") | |
# 添付ファイルの保存 | |
attachment_count = 0 | |
if msg.is_multipart(): | |
for part in msg.walk(): | |
if part.get_content_maintype() == 'multipart': | |
continue | |
filename = part.get_filename() | |
if filename: | |
attachment_count += 1 | |
# ファイル名のデコード | |
filename = decode_str(filename) | |
# ファイル名のサニタイズ | |
safe_filename = sanitize_filename(filename) | |
# 添付ファイルのパスを作成 | |
attachment_path = os.path.join(output_dir, f"{date_prefix}_{safe_filename}") | |
# 添付ファイルの内容を取得して保存 | |
with open(attachment_path, 'wb') as f: | |
f.write(part.get_payload(decode=True)) | |
print(f" 添付ファイルを保存: {os.path.basename(attachment_path)}") | |
if attachment_count == 0: | |
print(" 添付ファイルなし") | |
print("-" * 40) | |
except Exception as e: | |
print(f"エラー: メッセージの処理中に例外が発生しました: {str(e)}") | |
print(f"処理完了: 合計 {len(mbox)} 件のメールを処理しました") | |
def main(): | |
# コマンドライン引数の処理 | |
if len(sys.argv) != 2: | |
print(f"使用方法: {sys.argv[0]} <mbox_file>") | |
sys.exit(1) | |
mbox_path = sys.argv[1] | |
output_dir = "emails" | |
if not os.path.exists(mbox_path): | |
print(f"エラー: ファイルが見つかりません: {mbox_path}") | |
sys.exit(1) | |
process_mbox(mbox_path, output_dir) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment