Skip to content

Instantly share code, notes, and snippets.

@georgy7
Last active November 14, 2024 08:37
Show Gist options
  • Save georgy7/3a80bce2cd8bf2f9985c to your computer and use it in GitHub Desktop.
Save georgy7/3a80bce2cd8bf2f9985c to your computer and use it in GitHub Desktop.
Extract attachments from mbox file.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Modified.
# Original script source:
# http://blog.marcbelmont.com/2012/10/script-to-extract-email-attachments.html
# https://web.archive.org/web/20150312172727/http://blog.marcbelmont.com/2012/10/script-to-extract-email-attachments.html
# Usage:
# Run the script from a folder with file "all.mbox"
# Attachments will be extracted into subfolder "attachments"
# with prefix "m " where m is a message ID in mbox file.
# Or
# ./extract_mbox_attachments.py -i first.mbox -o attachments1/
# ./extract_mbox_attachments.py -i second.mbox -o attachments2/
# ./extract_mbox_attachments.py --help
# ---------------
# Please check the unpacked files
# with an antivirus before opening them!
# ---------------
# I make no representations or warranties of any kind concerning
# the software, express, implied, statutory or otherwise,
# including without limitation warranties of title, merchantability,
# fitness for a particular purpose, non infringement, or the
# absence of latent or other defects, accuracy, or the present or
# absence of errors, whether or not discoverable, all to the
# greatest extent permissible under applicable law.
import errno
import mailbox
import mimetypes
import os
import pathlib # since Python 3.4
import re
import traceback
from email.header import decode_header
import argparse
import sys
def parse_options(args=[]):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-i', '--input', default='all.mbox', help='Input file')
parser.add_argument('-o', '--output', default='attachments/', help='Output folder')
parser.add_argument('--no-inline-images', action='store_true')
parser.add_argument('--start',
type=message_id_type, default=0,
help='On which message to start')
parser.add_argument('--stop',
type=message_id_type, default=100000000000,
help='On which message to stop, not included')
return parser.parse_args(args)
def message_id_type(arg):
try:
i = int(arg)
except ValueError as e:
raise argparse.ArgumentTypeError(str(e))
if i < 0:
raise argparse.ArgumentTypeError("Must be greater than or equal 0.")
return i
class Extractor:
def __init__(self, options):
self.__total = 0
self.__failed = 0
self.options = options
assert os.path.isfile(options.input)
self.mbox = mailbox.mbox(options.input)
if not os.path.exists(options.output):
os.makedirs(options.output)
self.inline_image_folder = os.path.join(options.output, 'inline_images/')
if (not options.no_inline_images) and (not os.path.exists(self.inline_image_folder)):
os.makedirs(self.inline_image_folder)
def increment_total(self):
self.__total += 1
def increment_failed(self):
self.__failed += 1
def get_total(self):
return self.__total
def get_failed(self):
return self.__failed
def to_file_path(save_to, name):
return os.path.join(save_to, name)
def get_extension(name):
extension = pathlib.Path(name).suffix
return extension if len(extension) <= 20 else ''
def resolve_name_conflicts(save_to, name, file_paths, attachment_number):
file_path = to_file_path(save_to, name)
START = 1
iteration_number = START
while os.path.normcase(file_path) in file_paths:
extension = get_extension(name)
iteration = '' if iteration_number <= START else ' (%s)' % iteration_number
new_name = '%s attachment %s%s%s' % (name, attachment_number, iteration, extension)
file_path = to_file_path(save_to, new_name)
iteration_number += 1
file_paths.append(os.path.normcase(file_path))
return file_path
# Whitespaces: tab, carriage return, newline, vertical tab, form feed.
FORBIDDEN_WHITESPACE_IN_FILENAMES = re.compile('[\t\r\n\v\f]+')
OTHER_FORBIDDEN_FN_CHARACTERS = re.compile('[/\\\\\\?%\\*:\\|"<>\0]')
def filter_fn_characters(s):
result = re.sub(FORBIDDEN_WHITESPACE_IN_FILENAMES, ' ', s)
result = re.sub(OTHER_FORBIDDEN_FN_CHARACTERS, '_', result)
return result
def decode_filename(part, fallback_filename, mid):
if part.get_filename() is None:
print('Filename is none: %s %s.' % (mid, fallback_filename))
return fallback_filename
else:
decoded_name = decode_header(part.get_filename())
if isinstance(decoded_name[0][0], str):
return decoded_name[0][0]
else:
try:
name_encoding = decoded_name[0][1]
return decoded_name[0][0].decode(name_encoding)
except:
print('Could not decode %s %s attachment name.' % (mid, fallback_filename))
return fallback_filename
def write_to_disk(part, file_path):
with open(file_path, 'wb') as f:
f.write(part.get_payload(decode=True))
def save(extractor, mid, part, attachments_counter, inline_image=False):
extractor.increment_total()
try:
if inline_image:
attachments_counter['inline_image'] += 1
attachment_number_string = 'ii' + str(attachments_counter['inline_image'])
destination_folder = extractor.inline_image_folder
else:
attachments_counter['value'] += 1
attachment_number_string = str(attachments_counter['value'])
destination_folder = extractor.options.output
filename = decode_filename(
part,
attachment_number_string + str(mimetypes.guess_extension(part.get_content_type()) or ''),
mid)
filename = filter_fn_characters(filename)
filename = '%s %s' % (mid, filename)
previous_file_paths = attachments_counter['file_paths']
try:
write_to_disk(part, resolve_name_conflicts(
destination_folder, filename,
previous_file_paths,
attachment_number_string))
except OSError as e:
if e.errno == errno.ENAMETOOLONG:
short_name = '%s %s%s' % (mid, attachment_number_string, get_extension(filename))
write_to_disk(part, resolve_name_conflicts(
destination_folder, short_name,
previous_file_paths,
attachment_number_string))
else:
raise
except:
traceback.print_exc()
extractor.increment_failed()
def check_part(extractor, mid, part, attachments_counter):
mime_type = part.get_content_type()
if part.is_multipart():
for p in part.get_payload():
check_part(extractor, mid, p, attachments_counter)
elif (part.get_content_disposition() == 'attachment') \
or ((part.get_content_disposition() != 'inline') and (part.get_filename() is not None)):
save(extractor, mid, part, attachments_counter)
elif (mime_type.startswith('application/') and not mime_type == 'application/javascript') \
or mime_type.startswith('model/') \
or mime_type.startswith('audio/') \
or mime_type.startswith('video/'):
message_id_content_type = 'Message id = %s, Content-type = %s.' % (mid, mime_type)
if part.get_content_disposition() == 'inline':
print('Extracting inline part... ' + message_id_content_type)
else:
print('Other Content-disposition... ' + message_id_content_type)
save(extractor, mid, part, attachments_counter)
elif (not extractor.options.no_inline_images) and mime_type.startswith('image/'):
save(extractor, mid, part, attachments_counter, True)
def process_message(extractor, mid):
msg = extractor.mbox.get_message(mid)
if msg.is_multipart():
attachments_counter = {
'value': 0,
'inline_image': 0,
'file_paths': []
}
for part in msg.get_payload():
check_part(extractor, mid, part, attachments_counter)
def extract_mbox_file(options):
extractor = Extractor(options)
print()
for i in range(options.start, options.stop):
try:
process_message(extractor, i)
except KeyError:
print('The whole mbox file was processed.')
break
if i % 1000 == 0:
print('Messages processed: {}'.format(i))
print()
print('Total files: %s' % extractor.get_total())
print('Failed: %s' % extractor.get_failed())
if __name__ == "__main__":
extract_mbox_file(parse_options(sys.argv[1:]))
@moekidu
Copy link

moekidu commented Jan 21, 2024

@georgy7 thank you for this, extremely usefull

@bounceswoosh
Copy link

@georgy7 Thank you! This is just what I needed.

@Oil3
Copy link

Oil3 commented Mar 30, 2024

thank you , amazing, extracted 150ish excel files in seconds.
My own attempt in swift failed and i ended up with 800k files. lol. THank you again

@Ayno-587
Copy link

Ayno-587 commented Apr 24, 2024

@georgy7 Thank You! It took out 30000+ files in just 5 mins

@crawlgsx
Copy link

Thank you so much for this, worked perfect! 1k files took mere seconds. I tried so many other methods and was striking out, again many thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment