-
-
Save georgy7/3a80bce2cd8bf2f9985c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Modified. | |
# Original script source: | |
# http://blog.marcbelmont.com/2012/10/script-to-extract-email-attachments.html | |
# https://web.archive.org/web/20150312172727/http://blog.marcbelmont.com/2012/10/script-to-extract-email-attachments.html | |
# Usage: | |
# Run the script from a folder with file "all.mbox" | |
# Attachments will be extracted into subfolder "attachments" | |
# with prefix "m " where m is a message ID in mbox file. | |
# Or | |
# ./extract_mbox_attachments.py -i first.mbox -o attachments1/ | |
# ./extract_mbox_attachments.py -i second.mbox -o attachments2/ | |
# ./extract_mbox_attachments.py --help | |
# --------------- | |
# Please check the unpacked files | |
# with an antivirus before opening them! | |
# --------------- | |
# I make no representations or warranties of any kind concerning | |
# the software, express, implied, statutory or otherwise, | |
# including without limitation warranties of title, merchantability, | |
# fitness for a particular purpose, non infringement, or the | |
# absence of latent or other defects, accuracy, or the present or | |
# absence of errors, whether or not discoverable, all to the | |
# greatest extent permissible under applicable law. | |
import errno | |
import mailbox | |
import mimetypes | |
import os | |
import pathlib # since Python 3.4 | |
import re | |
import traceback | |
from email.header import decode_header | |
import argparse | |
import sys | |
def parse_options(args=[]): | |
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument('-i', '--input', default='all.mbox', help='Input file') | |
parser.add_argument('-o', '--output', default='attachments/', help='Output folder') | |
parser.add_argument('--no-inline-images', action='store_true') | |
parser.add_argument('--start', | |
type=message_id_type, default=0, | |
help='On which message to start') | |
parser.add_argument('--stop', | |
type=message_id_type, default=100000000000, | |
help='On which message to stop, not included') | |
return parser.parse_args(args) | |
def message_id_type(arg): | |
try: | |
i = int(arg) | |
except ValueError as e: | |
raise argparse.ArgumentTypeError(str(e)) | |
if i < 0: | |
raise argparse.ArgumentTypeError("Must be greater than or equal 0.") | |
return i | |
class Extractor: | |
def __init__(self, options): | |
self.__total = 0 | |
self.__failed = 0 | |
self.options = options | |
assert os.path.isfile(options.input) | |
self.mbox = mailbox.mbox(options.input) | |
if not os.path.exists(options.output): | |
os.makedirs(options.output) | |
self.inline_image_folder = os.path.join(options.output, 'inline_images/') | |
if (not options.no_inline_images) and (not os.path.exists(self.inline_image_folder)): | |
os.makedirs(self.inline_image_folder) | |
def increment_total(self): | |
self.__total += 1 | |
def increment_failed(self): | |
self.__failed += 1 | |
def get_total(self): | |
return self.__total | |
def get_failed(self): | |
return self.__failed | |
def to_file_path(save_to, name): | |
return os.path.join(save_to, name) | |
def get_extension(name): | |
extension = pathlib.Path(name).suffix | |
return extension if len(extension) <= 20 else '' | |
def resolve_name_conflicts(save_to, name, file_paths, attachment_number): | |
file_path = to_file_path(save_to, name) | |
START = 1 | |
iteration_number = START | |
while os.path.normcase(file_path) in file_paths: | |
extension = get_extension(name) | |
iteration = '' if iteration_number <= START else ' (%s)' % iteration_number | |
new_name = '%s attachment %s%s%s' % (name, attachment_number, iteration, extension) | |
file_path = to_file_path(save_to, new_name) | |
iteration_number += 1 | |
file_paths.append(os.path.normcase(file_path)) | |
return file_path | |
# Whitespaces: tab, carriage return, newline, vertical tab, form feed. | |
FORBIDDEN_WHITESPACE_IN_FILENAMES = re.compile('[\t\r\n\v\f]+') | |
OTHER_FORBIDDEN_FN_CHARACTERS = re.compile('[/\\\\\\?%\\*:\\|"<>\0]') | |
def filter_fn_characters(s): | |
result = re.sub(FORBIDDEN_WHITESPACE_IN_FILENAMES, ' ', s) | |
result = re.sub(OTHER_FORBIDDEN_FN_CHARACTERS, '_', result) | |
return result | |
def decode_filename(part, fallback_filename, mid): | |
if part.get_filename() is None: | |
print('Filename is none: %s %s.' % (mid, fallback_filename)) | |
return fallback_filename | |
else: | |
decoded_name = decode_header(part.get_filename()) | |
if isinstance(decoded_name[0][0], str): | |
return decoded_name[0][0] | |
else: | |
try: | |
name_encoding = decoded_name[0][1] | |
return decoded_name[0][0].decode(name_encoding) | |
except: | |
print('Could not decode %s %s attachment name.' % (mid, fallback_filename)) | |
return fallback_filename | |
def write_to_disk(part, file_path): | |
with open(file_path, 'wb') as f: | |
f.write(part.get_payload(decode=True)) | |
def save(extractor, mid, part, attachments_counter, inline_image=False): | |
extractor.increment_total() | |
try: | |
if inline_image: | |
attachments_counter['inline_image'] += 1 | |
attachment_number_string = 'ii' + str(attachments_counter['inline_image']) | |
destination_folder = extractor.inline_image_folder | |
else: | |
attachments_counter['value'] += 1 | |
attachment_number_string = str(attachments_counter['value']) | |
destination_folder = extractor.options.output | |
filename = decode_filename( | |
part, | |
attachment_number_string + str(mimetypes.guess_extension(part.get_content_type()) or ''), | |
mid) | |
filename = filter_fn_characters(filename) | |
filename = '%s %s' % (mid, filename) | |
previous_file_paths = attachments_counter['file_paths'] | |
try: | |
write_to_disk(part, resolve_name_conflicts( | |
destination_folder, filename, | |
previous_file_paths, | |
attachment_number_string)) | |
except OSError as e: | |
if e.errno == errno.ENAMETOOLONG: | |
short_name = '%s %s%s' % (mid, attachment_number_string, get_extension(filename)) | |
write_to_disk(part, resolve_name_conflicts( | |
destination_folder, short_name, | |
previous_file_paths, | |
attachment_number_string)) | |
else: | |
raise | |
except: | |
traceback.print_exc() | |
extractor.increment_failed() | |
def check_part(extractor, mid, part, attachments_counter): | |
mime_type = part.get_content_type() | |
if part.is_multipart(): | |
for p in part.get_payload(): | |
check_part(extractor, mid, p, attachments_counter) | |
elif (part.get_content_disposition() == 'attachment') \ | |
or ((part.get_content_disposition() != 'inline') and (part.get_filename() is not None)): | |
save(extractor, mid, part, attachments_counter) | |
elif (mime_type.startswith('application/') and not mime_type == 'application/javascript') \ | |
or mime_type.startswith('model/') \ | |
or mime_type.startswith('audio/') \ | |
or mime_type.startswith('video/'): | |
message_id_content_type = 'Message id = %s, Content-type = %s.' % (mid, mime_type) | |
if part.get_content_disposition() == 'inline': | |
print('Extracting inline part... ' + message_id_content_type) | |
else: | |
print('Other Content-disposition... ' + message_id_content_type) | |
save(extractor, mid, part, attachments_counter) | |
elif (not extractor.options.no_inline_images) and mime_type.startswith('image/'): | |
save(extractor, mid, part, attachments_counter, True) | |
def process_message(extractor, mid): | |
msg = extractor.mbox.get_message(mid) | |
if msg.is_multipart(): | |
attachments_counter = { | |
'value': 0, | |
'inline_image': 0, | |
'file_paths': [] | |
} | |
for part in msg.get_payload(): | |
check_part(extractor, mid, part, attachments_counter) | |
def extract_mbox_file(options): | |
extractor = Extractor(options) | |
print() | |
for i in range(options.start, options.stop): | |
try: | |
process_message(extractor, i) | |
except KeyError: | |
print('The whole mbox file was processed.') | |
break | |
if i % 1000 == 0: | |
print('Messages processed: {}'.format(i)) | |
print() | |
print('Total files: %s' % extractor.get_total()) | |
print('Failed: %s' % extractor.get_failed()) | |
if __name__ == "__main__": | |
extract_mbox_file(parse_options(sys.argv[1:])) |
@georgy7 Thank you! This is just what I needed.
thank you , amazing, extracted 150ish excel files in seconds.
My own attempt in swift failed and i ended up with 800k files. lol. THank you again
@georgy7 Thank You! It took out 30000+ files in just 5 mins
Thank you so much for this, worked perfect! 1k files took mere seconds. I tried so many other methods and was striking out, again many thanks.
Thanx very much. Worked like a charm, all my pdfs were extracted successfully.
thanks.. i use grok and pyinstaller to make a exe file with graphical interface .. works!...
https://mega.nz/file/40k0CJqT#xoZsG-D3fm0iDf0ZrsJBSJEGQOaYYkyVk0zRbsxR7sg
thanks.. i use grok and pyinstaller to make a exe file with graphical interface .. works!... https://mega.nz/file/40k0CJqT#xoZsG-D3fm0iDf0ZrsJBSJEGQOaYYkyVk0zRbsxR7sg
Why did this need a graphical interface?
thanks.. i use grok and pyinstaller to make a exe file with graphical interface .. works!... https://mega.nz/file/40k0CJqT#xoZsG-D3fm0iDf0ZrsJBSJEGQOaYYkyVk0zRbsxR7sg
Why did this need a graphical interface?
Oh easy select file, output path and add a progress bar .. my thoughts
@georgy7 thank you for this, extremely usefull