Created
October 13, 2022 04:39
-
-
Save thewisenerd/cc05eae6f842d15dffa501b6040516ea to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import email | |
import email.header | |
import email.message | |
import email.utils | |
import hashlib | |
import json | |
import os | |
import quopri | |
import sys | |
import typing | |
import cgi | |
html_path = "/tmp" | |
def decode(message: email.message.Message, depth: int = 0, ctr: int = 0) -> tuple[str, str, list[tuple[str, str, str]]]: | |
p, q = "", "" | |
r: list[tuple[str, str, str]] = [] | |
payload = message.get_payload() | |
if type(payload) == list: | |
for idx, part in enumerate(payload): | |
a, b, c = decode(part, depth=depth + 1, ctr=idx) | |
p += a | |
q += b | |
for obj in c: | |
r.append(obj) | |
if type(payload) == str: | |
if message.get_content_type() == 'text/plain': | |
p += payload | |
if message.get_content_type() == 'text/html': | |
charset = message.get_content_charset() | |
if charset is None: | |
charset = 'ascii' | |
html = quopri.decodestring(payload).decode(charset) | |
if charset == 'utf-8': | |
q += html | |
else: | |
q += html.encode('utf-8').decode('utf-8') | |
disposition = message.get_content_disposition() | |
encoding = message.get('content-transfer-encoding') | |
if disposition is not None and disposition == 'attachment': | |
content_type = message.get_content_type() | |
file_name = str((depth * 100) + ctr) | |
value, params = cgi.parse_header(message.get('content-disposition')) | |
if 'filename' in params: | |
file_name = params['filename'] | |
if encoding is not None and encoding == 'base64': | |
r.append((file_name, content_type, payload.replace('\n', ''))) | |
else: | |
# make sure we get the filename out right, do not muddy payload | |
r.append((file_name, 'text/plain', "")) | |
return p, q, r | |
def extract(msg_file: typing.BinaryIO, file_name: str) -> dict: | |
msg: email.message.Message = email.message_from_binary_file(msg_file) | |
result = {} | |
result['date'] = msg.get('date') | |
result['subject'] = msg.get('subject') | |
result['from'] = email.utils.getaddresses(msg.get_all('from')) | |
result['to'] = email.utils.getaddresses(msg.get_all('to')) | |
result['cc'] = email.utils.getaddresses(msg.get_all('cc')) | |
text, html, attachments = decode(msg) | |
result['text'] = text | |
result['html'] = html | |
result['attachments'] = attachments | |
message_id = email.utils.getaddresses(msg.get_all('message-id')) | |
if len(message_id) == 0: | |
message_id = "" | |
else: | |
message_id = message_id[0][1] | |
result['message-id'] = message_id | |
return result | |
def write_header(result: dict, key: str, out: typing.TextIO): | |
if key in result: | |
value = result[key] | |
else: | |
value = None | |
out.write('<tr>') | |
out.write('<td style="border: 1px solid black; padding: 4px;"><strong>' + key + '</strong></td>') | |
out.write('<td style="border: 1px solid black; padding: 4px;">' + str(value) + '</td>') | |
out.write('</tr>') | |
def write_result(result: dict, out: typing.TextIO): | |
out.write('<table style="border-collapse: collapse;">') | |
write_header(result, 'date', out) | |
write_header(result, 'subject', out) | |
write_header(result, 'from', out) | |
write_header(result, 'to', out) | |
write_header(result, 'cc', out) | |
attachments = result['attachments'] | |
if len(attachments) > 0: | |
for idx, a in enumerate(attachments): | |
filename, content_type, b64_content = a | |
out.write('<tr>') | |
out.write('<td style="border: 1px solid black; padding: 4px;">' + | |
'<strong>Attachment' + str(idx + 1) + '</strong>' | |
'</td>') | |
out.write('<td style="border: 1px solid black; padding: 4px;">') | |
out.write('<a href="data:{};base64,{}" download="{}">{}</a>'.format(content_type, b64_content, filename, | |
filename)) | |
out.write('</td>') | |
out.write('</table>') | |
out.write('<hr>') | |
text = result['text'] | |
html = result['html'] | |
wb = html | |
if len(html) == 0: | |
wb = text | |
out.write(wb) | |
def main(file_path: str): | |
with open(file_path, "rb") as f: | |
result = extract(f, f.name) | |
result_path = os.path.join(html_path, hashlib.md5(f.name.encode('utf-8')).hexdigest() + '.html') | |
with open(result_path, 'w') as rf: | |
write_result(result, rf) | |
result['output'] = result_path | |
attachments = result['attachments'] | |
result['attachments_count'] = len(attachments) | |
del result['attachments'] | |
html = result['html'] | |
result['has_html'] = len(html) > 0 | |
del result['html'] | |
print(json.dumps(result, indent=2)) | |
if __name__ == '__main__': | |
if len(sys.argv) != 2: | |
print("usage: read-eml.py <eml>") | |
sys.exit(1) | |
file = sys.argv[1] | |
if not os.path.exists(file): | |
print("file does not exist: {}".format(file)) | |
sys.exit(1) | |
main(file) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment