Skip to content

Instantly share code, notes, and snippets.

@richardsonlima
Created June 15, 2016 00:03
Show Gist options
  • Save richardsonlima/335a5e288b5fc7d165e157c97ebcd2b2 to your computer and use it in GitHub Desktop.
Save richardsonlima/335a5e288b5fc7d165e157c97ebcd2b2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# Upload mbox format email to zimbra
########################################################################
# Libraries
########################################################################
import email, email.Errors, mailbox, imaplib, sys, getopt
import os.path, StringIO, re, tempfile
########################################################################
# Configuration defaults
########################################################################
# Store configuration in a dictionary so that we need only one argument
# to "global" in funtions to reference all the values.
#
config = {}
# Set defaults
#
config['self'] = os.path.basename(sys.argv[0])
config['verbose'] = 5 # "notice"; Syslog-style priority level
config['mailbox'] = 'Inbox'
config['recursive_mode'] = 0
########################################################################
# Functions
########################################################################
def main():
output('info', 'main(): starting')
global config
process_options()
if config['recursive_mode'] == 1:
output('debug', 'main(): about to run recursive_import()')
recursive_import()
else:
output('debug', 'main(): about to run single_import()')
single_import()
output('info', 'main(): completed')
########################################################################
# TODO test mbox files with . in the name
# TODO test both absolute and relative paths in import
# Given an mbox file path, translate it to an IMAP-style mailbox path
def source2target(source):
output('info', 'source2target(%s): starting' % source)
target = source
if re.search(r'\.', source) != None:
output('warning', 'mbox file "%s" contains "." character' % source)
output('warning', 'Replacing "." with "_"')
target = re.sub(r'\.', '_', target)
# Translate to IMAP-style path separator (replace "/" with ".")
target = re.sub(r'/', '.', target)
# Strip off the containing directory (up to the first "." character)
target = re.sub(r'^[^\.]+.', '', target)
output('info', 'source2target(%s): returning %s' % (source, target) )
return target
########################################################################
# Upload all files in a hierarchy to an IMAP server
def recursive_import():
output('info', 'recursive_import(): starting')
global config
if not os.path.isdir(config['source']):
output('crit', 'Argument must be a directory when using -r')
output('crit', 'Given "%s")' % config['source'])
sys.exit(1)
source_list = build_file_list(config['source'])
output('debug', 'recursive_import(): source_list = %s' % source_list)
# Need to know what to strip off when creating targets on the
# IMAP server
#
base = os.path.dirname(config['source'])
base = base + '/'
target_list = source_list
# If the source directory was specified as a path with more that
# one component, we need to strip it down to the last componend(the
# containing directory) since that is what the hierarchy on the IMAP
# server will be created relative to.
#
target_list = map(lambda x: re.sub('^' + base, '', x), target_list)
target_list = map(source2target, target_list)
output('debug', 'recursive_import(): target_list = %s' % target_list)
create_mailboxes(target_list)
for source in source_list:
output('debug', 'recursive_import(): source = %s' % source)
target = source
output('debug', 'recursive_import(): target = %s' % target)
output('debug', 'recursive_import(): base = %s' % base)
target = re.sub('^' + base, '', target)
output('debug', 'recursive_import(): target = %s' % target)
target = source2target(target)
output('debug', 'recursive_import(): target = %s' % target)
output('debug', 'recursive_import(): source = %s' % source)
output('debug', 'recursive_import(): target = %s' % target)
output('notice', 'Starting import of %s to %s' % (source, target) )
do_import(source, target)
output('notice', 'Finished import of %s to %s' % (source, target) )
output('info', 'recursive_import(): completed')
########################################################################
def single_import():
output('info', 'single_import(): starting')
global config
source = config['source']
target = config['mailbox']
output('notice', 'Starting import of %s to %s' % (source, target) )
do_import(source, target)
output('notice', 'Finished import of %s to %s' % (source, target) )
output('info', 'single_import(): completed')
########################################################################
# Process command line options
def process_options():
output('info', 'process_options(): starting')
global config
try:
opts, args = getopt.getopt(sys.argv[1:], "i:rs:u:v:p:")
except getopt.GetoptError:
usage()
sys.exit(1)
for option, argument in opts:
if option == '-m':
config['mailbox'] = argument
if option == '-r':
config['recursive_mode'] = 1
if option == "-u":
config['user'] = argument
if option == "-v":
config['verbose'] = argument
# Make sure desired log level is stored as an integer
config['verbose'] = numeric_log_level(config['verbose'])
output('debug', 'process_options(): opts = %s' % opts)
output('debug', 'process_options(): args = %s' % args)
# Summarize config
output('debug', "process_options(): config['mailbox'] = %s" %
config['mailbox'] )
output('debug', "process_options(): config['user'] = %s" %
config['user'] )
output('debug', "process_options(): config['verbose'] = %s" %
config['verbose'] )
output('debug', "process_options(): config['recursive_mode'] = %s" %
config['recursive_mode'] )
if len(args) == 0:
usage()
sys.exit()
if len(args) != 1:
output('crit', 'Too many file arguments: %s' % ' '.join(args))
output('crit', 'Expecting only one; aborting')
sys.exit(1)
config['source'] = args[0]
output('info', 'process_options(): completed')
########################################################################
# Return true if file is in mbox format
def is_mbox_file(file):
output('info', 'is_mbox_file(%s): starting' % file)
return open(file).readline()[0:5] == 'From '
########################################################################
# Given a directory, return a list of contained mbox files
def build_file_list(node):
output('info', 'build_file_list(%s): starting' % node)
file_children = []
directory_children = []
for entry in os.listdir(node):
if os.path.isfile(node + '/' + entry):
if is_mbox_file(node + '/' + entry):
file_children.append(entry)
elif os.path.isdir(node + '/' + entry):
directory_children.append(entry)
# Add containing directory to each entry
flat = map(lambda x: node + '/' + x, file_children)
# Recursively process directory children
for entry in directory_children:
flat.extend(build_file_list(node + '/' + entry))
return flat
########################################################################
def create_mailboxes(mailboxes):
global config
output('info', 'create_mailboxes(%s): starting' % mailboxes)
# Attempting to create a mailbox that already exists produces an
# IMAP protocol error, so we only want to attempt to create a
# mailbox that does not exist. To do this, we need a list of the
# current mailboxes. We can get that with the list() method of the
# IMAP4_SSL object, but the output it returns is formatted in a
# strange way:
#
# (\Noinferiors) "." "INBOX"
#
# We need to extract the string in the INBOX location. Use map() to
# iterate over the list and pull out the folder name using a regular
# expression.
#
extract = lambda x: re.search(r'^.*"\." "(.*)"', x).group(1)
current_mailboxes = ('') # TODO 'zmmailbox -z -m %s gaf' % config['user']
current_mailboxes = map(extract, current_mailboxes)
for mailbox in mailboxes:
if not current_mailboxes.__contains__(mailbox):
output('notice', 'Creating mailbox: ' + mailbox)
#TODO 'zmmailbox -z -m %s cf %s' % (config['user'], mailbox)
########################################################################
# Take an integer or string log level and return an integer log level
#
def numeric_log_level(level):
# If level is an integer between 0 and 7, pass it back
if range(8).__contains__(level):
return(level)
if level == 'debug':
return(7)
if level == 'info':
return(6)
if level == 'notice':
return(5)
if level == 'warning':
return(4)
if level == 'err':
return(3)
if level == 'crit':
return(2)
if level == 'alert':
return(1)
if level == 'emerg':
return(0)
# crit, alert, emerg: critical error, immediate termination
# err: non-fatal problem
# warning: possibly negative informational message
# notice: neutral informational... TODO
# info: function calls, arguments
# debug: protocol, data details
output('warning', 'Unknown log level "%s", assuming "emerg"' % level)
return(0)
########################################################################
# Take an integer or string log level and return a string log level
#
def string_log_level(level):
string_levels = ['emerg', 'alert', 'crit', 'err', 'warning',
'notice', 'info', 'debug']
# If level is already a valid string, pass it back
if string_levels.__contains__(level):
return(level)
# If level is a string between 0 and 7, return appropriate string
if range(8).__contains__(level):
return(string_levels[level])
output('warning', 'Unknown log level "%s", assuming "emerg"' % level)
return('emerg')
########################################################################
def output(level, message):
global config
if numeric_log_level(level) <= config['verbose']:
print "%s: (%s) %s" % (config['self'],
string_log_level(level),
message)
########################################################################
# TODO
def usage():
global config
print '''Usage: %s [OPTION]... FILE
Import contents of mbox FILE to zimbra.
-m MAILBOX when not using -r, import to MAILBOX (default: %s)
-r recursively import mbox files (FILE must be a directory)
-u USER authenticate as USER
-v LEVEL set verbosity to LEVEL (syslog priority style)
Note: "." characters are not allowed in mailbox names or directory
names. Such characters will be converted to "_" on the server.
When using -r, mailbox names will be derived from mbox file
hierarchy structure.
Warning: Please do not delete source mail until you have verified that
it has been imported successfully. This tool has been written with
safety in mind, but there are no guarantees.
''' % (config['self'], config['mailbox'])
########################################################################
def msgfactory(fp):
try:
return email.message_from_file(fp)
except email.Errors.MessageParseError:
# Don't return None since that will stop the mailbox iterator
return ''
########################################################################
# Extract the subject from a string representing an email message
def get_subject(msg_txt):
output('info', 'get_subject(): starting')
buffer = StringIO.StringIO(msg_txt)
for line in buffer:
if re.search(r'^Subject:', line):
return line.rstrip()
if line == '\n':
# End of headers. If we reached here, there is no subject.
output('warning', 'Message does not have a subject')
return ''
########################################################################
def do_import(from_file, to_mailbox):
global config
output('info', 'do_import(%s, %s): starting' % (from_file, to_mailbox) )
fp = open(from_file, 'r')
mbox = mailbox.UnixMailbox(fp, msgfactory)
for msg_obj in mbox:
msg_txt = msg_obj.as_string(unixfrom=False)
msg_fp, msg_fname = tempfile.mkstemp()
os.write(msg_fp, msg_txt)
os.close(msg_fp)
subject = get_subject(msg_txt)
output('notice', 'Uploading message from %s: %s' % (msg_fname, subject))
os.system('zmmailbox -z -m %s am %s %s' % (config['user'], to_mailbox, msg_fname))
#os.delete(msg_fname)
########################################################################
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment