Skip to content

Instantly share code, notes, and snippets.

@Ivlyth
Created January 11, 2018 11:37
Show Gist options
  • Save Ivlyth/25576219140a942824dd37858f0fef68 to your computer and use it in GitHub Desktop.
Save Ivlyth/25576219140a942824dd37858f0fef68 to your computer and use it in GitHub Desktop.
email parser - extract mail content and attachment
# -*- coding:utf8 -*-
"""
Author : Myth
Date : 18/1/11
Email : email4myth at gmail.com
"""
import email
import email.header
import hashlib
import chardet
def choose_alternative_part(subparts):
return sorted(subparts, key=lambda m: 1 if m.get_content_subtype() == 'html' else 0, reverse=True)[0]
def walk(mail):
if mail.is_multipart():
subparts = mail.get_payload()
content_type = mail.get_content_type()
if content_type and content_type.lower() == 'multipart/alternative':
prefer_subpart = choose_alternative_part(subparts)
for subpart in walk(prefer_subpart):
yield subpart
else:
for subpart in subparts:
for subsubpart in walk(subpart):
yield subsubpart
else:
yield mail
def decode_header(header):
rmsg = ''
header = header.replace('"', '')
for ret in email.header.decode_header(header):
msg, charset = ret
msg = decode_str(msg, charset)
if not rmsg:
rmsg += msg
else:
rmsg += ' ' + msg
return rmsg
def is_attachment(msg):
return bool(msg.get_filename())
def split_letter_and_attach(msgs):
letters = []
attachments = []
for msg in msgs:
if is_attachment(msg):
attachments.append(msg)
elif msg.get_content_maintype().lower() == 'text': # 非附件只取文本类内容
letters.append(msg)
return letters, attachments
def decode_str(s, charset, errors='ignore'):
if not s:
return s
if isinstance(s, unicode):
return s
if not charset:
ret = chardet.detect(s)
# {'confidence': 0.99, 'encoding': 'GB2312'}
if ret:
charset = ret.get('encoding')
if not charset:
charset = 'utf-8'
return s.decode(charset, errors=errors)
def merge_letters(letters):
content = u''
for letter in letters:
payload = letter.get_payload(decode=True)
if not payload:
continue
charset = letter.get_content_charset()
payload = decode_str(payload, charset)
content += payload
content += '\r\n\r\n'
return content
def content_md5(content):
return hashlib.md5(content).hexdigest()
def save_attachment(attachment):
content = attachment.get_payload(decode=True)
filename = decode_header(attachment.get_filename())
md5 = content_md5(content)
return {
'filename': filename,
'md5': md5,
'content': content
}
def parse_raw_mail(raw_mail, truncated):
try:
mail = email.message_from_string(raw_mail.lstrip())
except Exception as e:
print "error happened when get message from str: %s" % e
return {
'content': '',
'attachments': []
}
msgs = list(walk(mail))
if not msgs:
return {
'content': '',
'attachments': []
}
if truncated and is_attachment(msgs[-1]): # ignore last truncated attachment
msgs.pop()
letters, attachments = split_letter_and_attach(msgs)
letter_content = merge_letters(letters)
attach_infos = []
for attach in attachments:
attach_info = save_attachment(attach)
attach_infos.append(attach_info)
mail_info = {
'content': letter_content,
'attachments': attach_infos
}
return mail_info
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment