Created
June 15, 2016 00:03
-
-
Save richardsonlima/335a5e288b5fc7d165e157c97ebcd2b2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Upload mbox format email to zimbra | |
######################################################################## | |
# Libraries | |
######################################################################## | |
import email, email.Errors, mailbox, imaplib, sys, getopt | |
import os.path, StringIO, re, tempfile | |
######################################################################## | |
# Configuration defaults | |
######################################################################## | |
# Store configuration in a dictionary so that we need only one argument | |
# to "global" in funtions to reference all the values. | |
# | |
config = {} | |
# Set defaults | |
# | |
config['self'] = os.path.basename(sys.argv[0]) | |
config['verbose'] = 5 # "notice"; Syslog-style priority level | |
config['mailbox'] = 'Inbox' | |
config['recursive_mode'] = 0 | |
######################################################################## | |
# Functions | |
######################################################################## | |
def main(): | |
output('info', 'main(): starting') | |
global config | |
process_options() | |
if config['recursive_mode'] == 1: | |
output('debug', 'main(): about to run recursive_import()') | |
recursive_import() | |
else: | |
output('debug', 'main(): about to run single_import()') | |
single_import() | |
output('info', 'main(): completed') | |
######################################################################## | |
# TODO test mbox files with . in the name | |
# TODO test both absolute and relative paths in import | |
# Given an mbox file path, translate it to an IMAP-style mailbox path | |
def source2target(source): | |
output('info', 'source2target(%s): starting' % source) | |
target = source | |
if re.search(r'\.', source) != None: | |
output('warning', 'mbox file "%s" contains "." character' % source) | |
output('warning', 'Replacing "." with "_"') | |
target = re.sub(r'\.', '_', target) | |
# Translate to IMAP-style path separator (replace "/" with ".") | |
target = re.sub(r'/', '.', target) | |
# Strip off the containing directory (up to the first "." character) | |
target = re.sub(r'^[^\.]+.', '', target) | |
output('info', 'source2target(%s): returning %s' % (source, target) ) | |
return target | |
######################################################################## | |
# Upload all files in a hierarchy to an IMAP server | |
def recursive_import(): | |
output('info', 'recursive_import(): starting') | |
global config | |
if not os.path.isdir(config['source']): | |
output('crit', 'Argument must be a directory when using -r') | |
output('crit', 'Given "%s")' % config['source']) | |
sys.exit(1) | |
source_list = build_file_list(config['source']) | |
output('debug', 'recursive_import(): source_list = %s' % source_list) | |
# Need to know what to strip off when creating targets on the | |
# IMAP server | |
# | |
base = os.path.dirname(config['source']) | |
base = base + '/' | |
target_list = source_list | |
# If the source directory was specified as a path with more that | |
# one component, we need to strip it down to the last componend(the | |
# containing directory) since that is what the hierarchy on the IMAP | |
# server will be created relative to. | |
# | |
target_list = map(lambda x: re.sub('^' + base, '', x), target_list) | |
target_list = map(source2target, target_list) | |
output('debug', 'recursive_import(): target_list = %s' % target_list) | |
create_mailboxes(target_list) | |
for source in source_list: | |
output('debug', 'recursive_import(): source = %s' % source) | |
target = source | |
output('debug', 'recursive_import(): target = %s' % target) | |
output('debug', 'recursive_import(): base = %s' % base) | |
target = re.sub('^' + base, '', target) | |
output('debug', 'recursive_import(): target = %s' % target) | |
target = source2target(target) | |
output('debug', 'recursive_import(): target = %s' % target) | |
output('debug', 'recursive_import(): source = %s' % source) | |
output('debug', 'recursive_import(): target = %s' % target) | |
output('notice', 'Starting import of %s to %s' % (source, target) ) | |
do_import(source, target) | |
output('notice', 'Finished import of %s to %s' % (source, target) ) | |
output('info', 'recursive_import(): completed') | |
######################################################################## | |
def single_import(): | |
output('info', 'single_import(): starting') | |
global config | |
source = config['source'] | |
target = config['mailbox'] | |
output('notice', 'Starting import of %s to %s' % (source, target) ) | |
do_import(source, target) | |
output('notice', 'Finished import of %s to %s' % (source, target) ) | |
output('info', 'single_import(): completed') | |
######################################################################## | |
# Process command line options | |
def process_options(): | |
output('info', 'process_options(): starting') | |
global config | |
try: | |
opts, args = getopt.getopt(sys.argv[1:], "i:rs:u:v:p:") | |
except getopt.GetoptError: | |
usage() | |
sys.exit(1) | |
for option, argument in opts: | |
if option == '-m': | |
config['mailbox'] = argument | |
if option == '-r': | |
config['recursive_mode'] = 1 | |
if option == "-u": | |
config['user'] = argument | |
if option == "-v": | |
config['verbose'] = argument | |
# Make sure desired log level is stored as an integer | |
config['verbose'] = numeric_log_level(config['verbose']) | |
output('debug', 'process_options(): opts = %s' % opts) | |
output('debug', 'process_options(): args = %s' % args) | |
# Summarize config | |
output('debug', "process_options(): config['mailbox'] = %s" % | |
config['mailbox'] ) | |
output('debug', "process_options(): config['user'] = %s" % | |
config['user'] ) | |
output('debug', "process_options(): config['verbose'] = %s" % | |
config['verbose'] ) | |
output('debug', "process_options(): config['recursive_mode'] = %s" % | |
config['recursive_mode'] ) | |
if len(args) == 0: | |
usage() | |
sys.exit() | |
if len(args) != 1: | |
output('crit', 'Too many file arguments: %s' % ' '.join(args)) | |
output('crit', 'Expecting only one; aborting') | |
sys.exit(1) | |
config['source'] = args[0] | |
output('info', 'process_options(): completed') | |
######################################################################## | |
# Return true if file is in mbox format | |
def is_mbox_file(file): | |
output('info', 'is_mbox_file(%s): starting' % file) | |
return open(file).readline()[0:5] == 'From ' | |
######################################################################## | |
# Given a directory, return a list of contained mbox files | |
def build_file_list(node): | |
output('info', 'build_file_list(%s): starting' % node) | |
file_children = [] | |
directory_children = [] | |
for entry in os.listdir(node): | |
if os.path.isfile(node + '/' + entry): | |
if is_mbox_file(node + '/' + entry): | |
file_children.append(entry) | |
elif os.path.isdir(node + '/' + entry): | |
directory_children.append(entry) | |
# Add containing directory to each entry | |
flat = map(lambda x: node + '/' + x, file_children) | |
# Recursively process directory children | |
for entry in directory_children: | |
flat.extend(build_file_list(node + '/' + entry)) | |
return flat | |
######################################################################## | |
def create_mailboxes(mailboxes): | |
global config | |
output('info', 'create_mailboxes(%s): starting' % mailboxes) | |
# Attempting to create a mailbox that already exists produces an | |
# IMAP protocol error, so we only want to attempt to create a | |
# mailbox that does not exist. To do this, we need a list of the | |
# current mailboxes. We can get that with the list() method of the | |
# IMAP4_SSL object, but the output it returns is formatted in a | |
# strange way: | |
# | |
# (\Noinferiors) "." "INBOX" | |
# | |
# We need to extract the string in the INBOX location. Use map() to | |
# iterate over the list and pull out the folder name using a regular | |
# expression. | |
# | |
extract = lambda x: re.search(r'^.*"\." "(.*)"', x).group(1) | |
current_mailboxes = ('') # TODO 'zmmailbox -z -m %s gaf' % config['user'] | |
current_mailboxes = map(extract, current_mailboxes) | |
for mailbox in mailboxes: | |
if not current_mailboxes.__contains__(mailbox): | |
output('notice', 'Creating mailbox: ' + mailbox) | |
#TODO 'zmmailbox -z -m %s cf %s' % (config['user'], mailbox) | |
######################################################################## | |
# Take an integer or string log level and return an integer log level | |
# | |
def numeric_log_level(level): | |
# If level is an integer between 0 and 7, pass it back | |
if range(8).__contains__(level): | |
return(level) | |
if level == 'debug': | |
return(7) | |
if level == 'info': | |
return(6) | |
if level == 'notice': | |
return(5) | |
if level == 'warning': | |
return(4) | |
if level == 'err': | |
return(3) | |
if level == 'crit': | |
return(2) | |
if level == 'alert': | |
return(1) | |
if level == 'emerg': | |
return(0) | |
# crit, alert, emerg: critical error, immediate termination | |
# err: non-fatal problem | |
# warning: possibly negative informational message | |
# notice: neutral informational... TODO | |
# info: function calls, arguments | |
# debug: protocol, data details | |
output('warning', 'Unknown log level "%s", assuming "emerg"' % level) | |
return(0) | |
######################################################################## | |
# Take an integer or string log level and return a string log level | |
# | |
def string_log_level(level): | |
string_levels = ['emerg', 'alert', 'crit', 'err', 'warning', | |
'notice', 'info', 'debug'] | |
# If level is already a valid string, pass it back | |
if string_levels.__contains__(level): | |
return(level) | |
# If level is a string between 0 and 7, return appropriate string | |
if range(8).__contains__(level): | |
return(string_levels[level]) | |
output('warning', 'Unknown log level "%s", assuming "emerg"' % level) | |
return('emerg') | |
######################################################################## | |
def output(level, message): | |
global config | |
if numeric_log_level(level) <= config['verbose']: | |
print "%s: (%s) %s" % (config['self'], | |
string_log_level(level), | |
message) | |
######################################################################## | |
# TODO | |
def usage(): | |
global config | |
print '''Usage: %s [OPTION]... FILE | |
Import contents of mbox FILE to zimbra. | |
-m MAILBOX when not using -r, import to MAILBOX (default: %s) | |
-r recursively import mbox files (FILE must be a directory) | |
-u USER authenticate as USER | |
-v LEVEL set verbosity to LEVEL (syslog priority style) | |
Note: "." characters are not allowed in mailbox names or directory | |
names. Such characters will be converted to "_" on the server. | |
When using -r, mailbox names will be derived from mbox file | |
hierarchy structure. | |
Warning: Please do not delete source mail until you have verified that | |
it has been imported successfully. This tool has been written with | |
safety in mind, but there are no guarantees. | |
''' % (config['self'], config['mailbox']) | |
######################################################################## | |
def msgfactory(fp): | |
try: | |
return email.message_from_file(fp) | |
except email.Errors.MessageParseError: | |
# Don't return None since that will stop the mailbox iterator | |
return '' | |
######################################################################## | |
# Extract the subject from a string representing an email message | |
def get_subject(msg_txt): | |
output('info', 'get_subject(): starting') | |
buffer = StringIO.StringIO(msg_txt) | |
for line in buffer: | |
if re.search(r'^Subject:', line): | |
return line.rstrip() | |
if line == '\n': | |
# End of headers. If we reached here, there is no subject. | |
output('warning', 'Message does not have a subject') | |
return '' | |
######################################################################## | |
def do_import(from_file, to_mailbox): | |
global config | |
output('info', 'do_import(%s, %s): starting' % (from_file, to_mailbox) ) | |
fp = open(from_file, 'r') | |
mbox = mailbox.UnixMailbox(fp, msgfactory) | |
for msg_obj in mbox: | |
msg_txt = msg_obj.as_string(unixfrom=False) | |
msg_fp, msg_fname = tempfile.mkstemp() | |
os.write(msg_fp, msg_txt) | |
os.close(msg_fp) | |
subject = get_subject(msg_txt) | |
output('notice', 'Uploading message from %s: %s' % (msg_fname, subject)) | |
os.system('zmmailbox -z -m %s am %s %s' % (config['user'], to_mailbox, msg_fname)) | |
#os.delete(msg_fname) | |
######################################################################## | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment