Skip to content

Instantly share code, notes, and snippets.

@ecoopnet
Created February 28, 2025 12:29
Show Gist options
  • Save ecoopnet/eff110c30a085230f0b99d80c239c3af to your computer and use it in GitHub Desktop.
Save ecoopnet/eff110c30a085230f0b99d80c239c3af to your computer and use it in GitHub Desktop.
メールボックスファイル(*.mbox) をパースしてファイル展開するスクリプト
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import mailbox
import os
import email
import datetime
import re
import sys
from email.header import decode_header
def decode_str(s):
"""
メールヘッダーをデコードする関数
"""
if s is None:
return ""
decoded_parts = []
for part, encoding in decode_header(s):
if isinstance(part, bytes):
if encoding:
try:
decoded_parts.append(part.decode(encoding))
except:
# エンコーディングが失敗した場合、UTF-8で試行
try:
decoded_parts.append(part.decode('utf-8'))
except:
# それでも失敗した場合、エラー回避モードで
decoded_parts.append(part.decode('utf-8', errors='replace'))
else:
try:
decoded_parts.append(part.decode('utf-8'))
except:
decoded_parts.append(part.decode('utf-8', errors='replace'))
else:
decoded_parts.append(part)
return ''.join(decoded_parts)
def sanitize_filename(filename):
"""
ファイル名に使えない文字を置換する関数
"""
# ファイル名に使えない文字を置換
invalid_chars = r'[\\/*?:"<>|]'
return re.sub(invalid_chars, '_', filename)
def extract_date(msg):
"""
メッセージから日付情報を抽出し、YYMMdd_hhmmss形式に変換する関数
"""
date_str = msg.get('Date')
if not date_str:
# 日付がない場合は現在時刻を使用
now = datetime.datetime.now()
return now.strftime("%y%m%d_%H%M%S")
try:
# 様々な日付形式に対応するために複数のパターンを試行
date_formats = [
'%a, %d %b %Y %H:%M:%S %z',
'%a, %d %b %Y %H:%M:%S %Z',
'%d %b %Y %H:%M:%S %z',
'%a, %d %b %Y %H:%M:%S',
'%d %b %Y %H:%M:%S'
]
# 余分な情報(括弧内の情報など)を削除
date_str = re.sub(r'\([^)]*\)', '', date_str).strip()
parsed_date = None
for fmt in date_formats:
try:
parsed_date = datetime.datetime.strptime(date_str, fmt)
break
except ValueError:
continue
if parsed_date:
return parsed_date.strftime("%y%m%d_%H%M%S")
else:
# どの形式でもパースできない場合はemail.utils.parsedate_tzを使用
from email.utils import parsedate_to_datetime
try:
parsed_date = parsedate_to_datetime(date_str)
return parsed_date.strftime("%y%m%d_%H%M%S")
except:
# それでも失敗した場合は現在時刻を使用
now = datetime.datetime.now()
return now.strftime("%y%m%d_%H%M%S")
except:
# エラーが発生した場合は現在時刻を使用
now = datetime.datetime.now()
return now.strftime("%y%m%d_%H%M%S")
def process_mbox(mbox_path, output_dir):
"""
mboxファイルを処理して、メール内容と添付ファイルを保存する関数
"""
# 出力ディレクトリの作成
os.makedirs(output_dir, exist_ok=True)
# mboxファイルのオープン
mbox = mailbox.mbox(mbox_path)
print(f"処理を開始: {mbox_path}")
print(f"メール総数: {len(mbox)}")
# 各メッセージを処理
for i, msg in enumerate(mbox):
try:
# 日付の抽出
date_prefix = extract_date(msg)
# 件名の抽出
subject = decode_str(msg.get('Subject', 'No Subject'))
print(f"処理中 [{i+1}/{len(mbox)}]: 日付={date_prefix}, 件名={subject}")
# ファイル名の作成と無効な文字の置換
base_filename = sanitize_filename(f"{date_prefix}_{subject}")
# メール本文の保存
content = ""
content += f"From: {decode_str(msg.get('From', 'Unknown'))}\n"
content += f"To: {decode_str(msg.get('To', 'Unknown'))}\n"
content += f"Subject: {subject}\n"
content += f"Date: {msg.get('Date', 'Unknown')}\n"
content += "-" * 40 + "\n\n"
# メール本文の抽出
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
# 本文テキストの抽出
if content_type == "text/plain" and "attachment" not in content_disposition:
try:
body = part.get_payload(decode=True)
charset = part.get_content_charset()
if charset:
body = body.decode(charset, errors='replace')
else:
body = body.decode('utf-8', errors='replace')
content += body + "\n"
except Exception as e:
content += f"[本文のデコードに失敗: {str(e)}]\n"
else:
# 非マルチパートメッセージの処理
try:
body = msg.get_payload(decode=True)
charset = msg.get_content_charset()
if charset:
body = body.decode(charset, errors='replace')
else:
body = body.decode('utf-8', errors='replace')
content += body + "\n"
except Exception as e:
content += f"[本文のデコードに失敗: {str(e)}]\n"
# メール本文をファイルに保存
output_file = os.path.join(output_dir, f"{base_filename}.txt")
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content)
print(f" メール本文を保存: {os.path.basename(output_file)}")
# 添付ファイルの保存
attachment_count = 0
if msg.is_multipart():
for part in msg.walk():
if part.get_content_maintype() == 'multipart':
continue
filename = part.get_filename()
if filename:
attachment_count += 1
# ファイル名のデコード
filename = decode_str(filename)
# ファイル名のサニタイズ
safe_filename = sanitize_filename(filename)
# 添付ファイルのパスを作成
attachment_path = os.path.join(output_dir, f"{date_prefix}_{safe_filename}")
# 添付ファイルの内容を取得して保存
with open(attachment_path, 'wb') as f:
f.write(part.get_payload(decode=True))
print(f" 添付ファイルを保存: {os.path.basename(attachment_path)}")
if attachment_count == 0:
print(" 添付ファイルなし")
print("-" * 40)
except Exception as e:
print(f"エラー: メッセージの処理中に例外が発生しました: {str(e)}")
print(f"処理完了: 合計 {len(mbox)} 件のメールを処理しました")
def main():
# コマンドライン引数の処理
if len(sys.argv) != 2:
print(f"使用方法: {sys.argv[0]} <mbox_file>")
sys.exit(1)
mbox_path = sys.argv[1]
output_dir = "emails"
if not os.path.exists(mbox_path):
print(f"エラー: ファイルが見つかりません: {mbox_path}")
sys.exit(1)
process_mbox(mbox_path, output_dir)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment