Created
January 11, 2018 11:37
-
-
Save Ivlyth/25576219140a942824dd37858f0fef68 to your computer and use it in GitHub Desktop.
email parser - extract mail content and attachment
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding:utf8 -*- | |
""" | |
Author : Myth | |
Date : 18/1/11 | |
Email : email4myth at gmail.com | |
""" | |
import email | |
import email.header | |
import hashlib | |
import chardet | |
def choose_alternative_part(subparts): | |
return sorted(subparts, key=lambda m: 1 if m.get_content_subtype() == 'html' else 0, reverse=True)[0] | |
def walk(mail): | |
if mail.is_multipart(): | |
subparts = mail.get_payload() | |
content_type = mail.get_content_type() | |
if content_type and content_type.lower() == 'multipart/alternative': | |
prefer_subpart = choose_alternative_part(subparts) | |
for subpart in walk(prefer_subpart): | |
yield subpart | |
else: | |
for subpart in subparts: | |
for subsubpart in walk(subpart): | |
yield subsubpart | |
else: | |
yield mail | |
def decode_header(header): | |
rmsg = '' | |
header = header.replace('"', '') | |
for ret in email.header.decode_header(header): | |
msg, charset = ret | |
msg = decode_str(msg, charset) | |
if not rmsg: | |
rmsg += msg | |
else: | |
rmsg += ' ' + msg | |
return rmsg | |
def is_attachment(msg): | |
return bool(msg.get_filename()) | |
def split_letter_and_attach(msgs): | |
letters = [] | |
attachments = [] | |
for msg in msgs: | |
if is_attachment(msg): | |
attachments.append(msg) | |
elif msg.get_content_maintype().lower() == 'text': # 非附件只取文本类内容 | |
letters.append(msg) | |
return letters, attachments | |
def decode_str(s, charset, errors='ignore'): | |
if not s: | |
return s | |
if isinstance(s, unicode): | |
return s | |
if not charset: | |
ret = chardet.detect(s) | |
# {'confidence': 0.99, 'encoding': 'GB2312'} | |
if ret: | |
charset = ret.get('encoding') | |
if not charset: | |
charset = 'utf-8' | |
return s.decode(charset, errors=errors) | |
def merge_letters(letters): | |
content = u'' | |
for letter in letters: | |
payload = letter.get_payload(decode=True) | |
if not payload: | |
continue | |
charset = letter.get_content_charset() | |
payload = decode_str(payload, charset) | |
content += payload | |
content += '\r\n\r\n' | |
return content | |
def content_md5(content): | |
return hashlib.md5(content).hexdigest() | |
def save_attachment(attachment): | |
content = attachment.get_payload(decode=True) | |
filename = decode_header(attachment.get_filename()) | |
md5 = content_md5(content) | |
return { | |
'filename': filename, | |
'md5': md5, | |
'content': content | |
} | |
def parse_raw_mail(raw_mail, truncated): | |
try: | |
mail = email.message_from_string(raw_mail.lstrip()) | |
except Exception as e: | |
print "error happened when get message from str: %s" % e | |
return { | |
'content': '', | |
'attachments': [] | |
} | |
msgs = list(walk(mail)) | |
if not msgs: | |
return { | |
'content': '', | |
'attachments': [] | |
} | |
if truncated and is_attachment(msgs[-1]): # ignore last truncated attachment | |
msgs.pop() | |
letters, attachments = split_letter_and_attach(msgs) | |
letter_content = merge_letters(letters) | |
attach_infos = [] | |
for attach in attachments: | |
attach_info = save_attachment(attach) | |
attach_infos.append(attach_info) | |
mail_info = { | |
'content': letter_content, | |
'attachments': attach_infos | |
} | |
return mail_info |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment