Last active
November 14, 2024 08:37
-
-
Save georgy7/3a80bce2cd8bf2f9985c to your computer and use it in GitHub Desktop.
Extract attachments from mbox file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Modified. | |
# Original script source: | |
# http://blog.marcbelmont.com/2012/10/script-to-extract-email-attachments.html | |
# https://web.archive.org/web/20150312172727/http://blog.marcbelmont.com/2012/10/script-to-extract-email-attachments.html | |
# Usage: | |
# Run the script from a folder with file "all.mbox" | |
# Attachments will be extracted into subfolder "attachments" | |
# with prefix "m " where m is a message ID in mbox file. | |
# Or | |
# ./extract_mbox_attachments.py -i first.mbox -o attachments1/ | |
# ./extract_mbox_attachments.py -i second.mbox -o attachments2/ | |
# ./extract_mbox_attachments.py --help | |
# --------------- | |
# Please check the unpacked files | |
# with an antivirus before opening them! | |
# --------------- | |
# I make no representations or warranties of any kind concerning | |
# the software, express, implied, statutory or otherwise, | |
# including without limitation warranties of title, merchantability, | |
# fitness for a particular purpose, non infringement, or the | |
# absence of latent or other defects, accuracy, or the present or | |
# absence of errors, whether or not discoverable, all to the | |
# greatest extent permissible under applicable law. | |
import errno | |
import mailbox | |
import mimetypes | |
import os | |
import pathlib # since Python 3.4 | |
import re | |
import traceback | |
from email.header import decode_header | |
import argparse | |
import sys | |
def parse_options(args=[]): | |
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument('-i', '--input', default='all.mbox', help='Input file') | |
parser.add_argument('-o', '--output', default='attachments/', help='Output folder') | |
parser.add_argument('--no-inline-images', action='store_true') | |
parser.add_argument('--start', | |
type=message_id_type, default=0, | |
help='On which message to start') | |
parser.add_argument('--stop', | |
type=message_id_type, default=100000000000, | |
help='On which message to stop, not included') | |
return parser.parse_args(args) | |
def message_id_type(arg): | |
try: | |
i = int(arg) | |
except ValueError as e: | |
raise argparse.ArgumentTypeError(str(e)) | |
if i < 0: | |
raise argparse.ArgumentTypeError("Must be greater than or equal 0.") | |
return i | |
class Extractor: | |
def __init__(self, options): | |
self.__total = 0 | |
self.__failed = 0 | |
self.options = options | |
assert os.path.isfile(options.input) | |
self.mbox = mailbox.mbox(options.input) | |
if not os.path.exists(options.output): | |
os.makedirs(options.output) | |
self.inline_image_folder = os.path.join(options.output, 'inline_images/') | |
if (not options.no_inline_images) and (not os.path.exists(self.inline_image_folder)): | |
os.makedirs(self.inline_image_folder) | |
def increment_total(self): | |
self.__total += 1 | |
def increment_failed(self): | |
self.__failed += 1 | |
def get_total(self): | |
return self.__total | |
def get_failed(self): | |
return self.__failed | |
def to_file_path(save_to, name): | |
return os.path.join(save_to, name) | |
def get_extension(name): | |
extension = pathlib.Path(name).suffix | |
return extension if len(extension) <= 20 else '' | |
def resolve_name_conflicts(save_to, name, file_paths, attachment_number): | |
file_path = to_file_path(save_to, name) | |
START = 1 | |
iteration_number = START | |
while os.path.normcase(file_path) in file_paths: | |
extension = get_extension(name) | |
iteration = '' if iteration_number <= START else ' (%s)' % iteration_number | |
new_name = '%s attachment %s%s%s' % (name, attachment_number, iteration, extension) | |
file_path = to_file_path(save_to, new_name) | |
iteration_number += 1 | |
file_paths.append(os.path.normcase(file_path)) | |
return file_path | |
# Whitespaces: tab, carriage return, newline, vertical tab, form feed. | |
FORBIDDEN_WHITESPACE_IN_FILENAMES = re.compile('[\t\r\n\v\f]+') | |
OTHER_FORBIDDEN_FN_CHARACTERS = re.compile('[/\\\\\\?%\\*:\\|"<>\0]') | |
def filter_fn_characters(s): | |
result = re.sub(FORBIDDEN_WHITESPACE_IN_FILENAMES, ' ', s) | |
result = re.sub(OTHER_FORBIDDEN_FN_CHARACTERS, '_', result) | |
return result | |
def decode_filename(part, fallback_filename, mid): | |
if part.get_filename() is None: | |
print('Filename is none: %s %s.' % (mid, fallback_filename)) | |
return fallback_filename | |
else: | |
decoded_name = decode_header(part.get_filename()) | |
if isinstance(decoded_name[0][0], str): | |
return decoded_name[0][0] | |
else: | |
try: | |
name_encoding = decoded_name[0][1] | |
return decoded_name[0][0].decode(name_encoding) | |
except: | |
print('Could not decode %s %s attachment name.' % (mid, fallback_filename)) | |
return fallback_filename | |
def write_to_disk(part, file_path): | |
with open(file_path, 'wb') as f: | |
f.write(part.get_payload(decode=True)) | |
def save(extractor, mid, part, attachments_counter, inline_image=False): | |
extractor.increment_total() | |
try: | |
if inline_image: | |
attachments_counter['inline_image'] += 1 | |
attachment_number_string = 'ii' + str(attachments_counter['inline_image']) | |
destination_folder = extractor.inline_image_folder | |
else: | |
attachments_counter['value'] += 1 | |
attachment_number_string = str(attachments_counter['value']) | |
destination_folder = extractor.options.output | |
filename = decode_filename( | |
part, | |
attachment_number_string + str(mimetypes.guess_extension(part.get_content_type()) or ''), | |
mid) | |
filename = filter_fn_characters(filename) | |
filename = '%s %s' % (mid, filename) | |
previous_file_paths = attachments_counter['file_paths'] | |
try: | |
write_to_disk(part, resolve_name_conflicts( | |
destination_folder, filename, | |
previous_file_paths, | |
attachment_number_string)) | |
except OSError as e: | |
if e.errno == errno.ENAMETOOLONG: | |
short_name = '%s %s%s' % (mid, attachment_number_string, get_extension(filename)) | |
write_to_disk(part, resolve_name_conflicts( | |
destination_folder, short_name, | |
previous_file_paths, | |
attachment_number_string)) | |
else: | |
raise | |
except: | |
traceback.print_exc() | |
extractor.increment_failed() | |
def check_part(extractor, mid, part, attachments_counter): | |
mime_type = part.get_content_type() | |
if part.is_multipart(): | |
for p in part.get_payload(): | |
check_part(extractor, mid, p, attachments_counter) | |
elif (part.get_content_disposition() == 'attachment') \ | |
or ((part.get_content_disposition() != 'inline') and (part.get_filename() is not None)): | |
save(extractor, mid, part, attachments_counter) | |
elif (mime_type.startswith('application/') and not mime_type == 'application/javascript') \ | |
or mime_type.startswith('model/') \ | |
or mime_type.startswith('audio/') \ | |
or mime_type.startswith('video/'): | |
message_id_content_type = 'Message id = %s, Content-type = %s.' % (mid, mime_type) | |
if part.get_content_disposition() == 'inline': | |
print('Extracting inline part... ' + message_id_content_type) | |
else: | |
print('Other Content-disposition... ' + message_id_content_type) | |
save(extractor, mid, part, attachments_counter) | |
elif (not extractor.options.no_inline_images) and mime_type.startswith('image/'): | |
save(extractor, mid, part, attachments_counter, True) | |
def process_message(extractor, mid): | |
msg = extractor.mbox.get_message(mid) | |
if msg.is_multipart(): | |
attachments_counter = { | |
'value': 0, | |
'inline_image': 0, | |
'file_paths': [] | |
} | |
for part in msg.get_payload(): | |
check_part(extractor, mid, part, attachments_counter) | |
def extract_mbox_file(options): | |
extractor = Extractor(options) | |
print() | |
for i in range(options.start, options.stop): | |
try: | |
process_message(extractor, i) | |
except KeyError: | |
print('The whole mbox file was processed.') | |
break | |
if i % 1000 == 0: | |
print('Messages processed: {}'.format(i)) | |
print() | |
print('Total files: %s' % extractor.get_total()) | |
print('Failed: %s' % extractor.get_failed()) | |
if __name__ == "__main__": | |
extract_mbox_file(parse_options(sys.argv[1:])) |
@georgy7 Thank you! This is just what I needed.
thank you , amazing, extracted 150ish excel files in seconds.
My own attempt in swift failed and i ended up with 800k files. lol. THank you again
@georgy7 Thank You! It took out 30000+ files in just 5 mins
Thank you so much for this, worked perfect! 1k files took mere seconds. I tried so many other methods and was striking out, again many thanks.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@georgy7 thank you for this, extremely usefull