Created
March 23, 2023 01:11
-
-
Save lordjabez/af54beb842d3bd54a91d9f6b31ceff39 to your computer and use it in GitHub Desktop.
Outlook Exporter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import datetime | |
import getpass | |
import os | |
import re | |
import exchangelib | |
import keyring | |
parser = argparse.ArgumentParser(description='Export Outlook items to timestamped files') | |
output_path_help = 'Location to store exported items' | |
parser.add_argument('-o', '--output-path', required=True, help=output_path_help) | |
source_folders_help = 'Optional list of Outlook folders to export (Inbox, Sent Items, and Calendar are always exported)' | |
parser.add_argument('-s', '--source-folders', nargs='+', default=[], metavar='FOLDER_NAME', help=source_folders_help) | |
args = parser.parse_args() | |
output_path = args.output_path | |
source_folders = args.source_folders | |
def sanitize(value): | |
value = str(value).lower()[:100] | |
return re.sub(r'[^0-9a-z_]', '_', value) | |
def get_password(email_username): | |
possible_locations = [ | |
('Exchange', email_username), | |
] | |
for location in possible_locations: | |
password = keyring.get_password(*location) | |
if password is not None: | |
return password | |
password = getpass.getpass('Stored password not found. Please enter email password: ') | |
keyring.set_password(*location, password) | |
return password | |
def connect_account(): | |
email_username = getpass.getuser() | |
email_password = get_password(email_username) | |
email_address = f'' # TODO | |
email_server = f'' # TODO | |
credentials = exchangelib.Credentials(email_username, email_password) | |
config = exchangelib.Configuration(server=email_server, credentials=credentials) | |
account = exchangelib.Account(primary_smtp_address=email_address, config=config, access_type=exchangelib.DELEGATE) | |
config_params = { | |
'service_endpoint': account.protocol.service_endpoint, | |
'auth_type': account.protocol.auth_type, | |
'version': account.version, | |
'credentials': credentials, | |
} | |
config = exchangelib.Configuration(**config_params) | |
account_params = { | |
'primary_smtp_address': account.primary_smtp_address, | |
'access_type': exchangelib.DELEGATE, | |
'autodiscover': False, | |
'config': config, | |
} | |
account = exchangelib.Account(**account_params) | |
account.root.refresh() | |
return account | |
def get_folders(account): | |
email_folders = { | |
'inbox': account.root / 'Top of Information Store' / 'Inbox', | |
'sent': account.root / 'Top of Information Store' / 'Sent Items', | |
} | |
email_folders.update({sanitize(d): account.root / 'Top of Information Store' / d for d in source_folders}) | |
meeting_folders = { | |
'calendar': account.root / 'Top of Information Store' / 'Calendar', | |
} | |
return email_folders, meeting_folders | |
def get_last_date(path): | |
return sorted(f.name for f in os.scandir(path) if f.is_dir())[-1] | |
def get_last_export(label, name): | |
try: | |
base_path = os.path.join(output_path, label, name) | |
last_year = get_last_date(base_path) | |
year_path = os.path.join(base_path, str(last_year)) | |
last_month = get_last_date(year_path) | |
month_path = os.path.join(base_path, str(last_year), str(last_month)) | |
last_day = get_last_date(month_path) | |
except Exception: | |
last_year = 2000 | |
last_month = 1 | |
last_day = 1 | |
return datetime.datetime(int(last_year), int(last_month), int(last_day), tzinfo=datetime.timezone.utc) | |
def write_item(label, name, time, sender, subject, content): | |
time = time.strftime('%Y%m%d_%H%M%S') | |
year, month, day = time[0:4], time[4:6], time[6:8] | |
sender = sanitize(sender) | |
subject = sanitize(subject) | |
extension = 'eml' if label == 'emails' else 'ics' | |
folder = os.path.join(output_path, label, name, year, month, day) | |
filename = os.path.join(folder, f'{time}-{sender}-{subject}.{extension}') | |
print('Writing', filename) | |
os.makedirs(folder, exist_ok=True) | |
with open(filename, 'wb') as file: | |
file.write(content) | |
def export_items(label, folders, last_export_time): | |
for name, folder in folders.items(): | |
count = 0 | |
print(f'Exporting items from {name}') | |
filter_key = 'datetime_sent__gte' if label == 'emails' else 'start__gte' | |
filter_params = {filter_key: last_export_time} | |
for item in folder.all().filter(**filter_params): | |
time = item.datetime_sent if label == 'emails' else item.start | |
sender = item.sender.email_address if label == 'emails' else item.organizer.email_address | |
try: | |
write_item(label, name, time, sender, item.subject, item.mime_content) | |
count += 1 | |
except OSError as error: | |
print(f'ERROR: {error}') | |
print(f'Exported {count} {label} from {name}') | |
account = connect_account() | |
email_folders, meeting_folders = get_folders(account) | |
last_export_time = get_last_export('emails', 'archive') | |
print('Exporting items as of', last_export_time.isoformat()) | |
export_items('emails', email_folders, last_export_time) | |
export_items('meetings', meeting_folders, last_export_time) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment