Skip to content

Instantly share code, notes, and snippets.

@aleksas
Last active March 4, 2020 09:28
Show Gist options
  • Save aleksas/8c57bd328c70a44228ec8edb9a63395f to your computer and use it in GitHub Desktop.
Save aleksas/8c57bd328c70a44228ec8edb9a63395f to your computer and use it in GitHub Desktop.
Service for copying POP3 incoming mail to IMAP (adapted from https://github.com/kyokuheki/gmail-importer3)
export POP3_SERVER = 'pop3.host.com'
export POP3_PORT = '110'
export POP3_USER = 'pop3_username'
export POP3_PASS = 'pop3_password'
export IMAP_SERVER = 'imap.host.com'
export IMAP_PORT = '993'
export IMAP_USER = 'imap_username'
export IMAP_PASS = 'imap_password'
export IMAP_DST_MBOX = 'INBOX'
export INTERVAL = '30'
export DELETE_AFTER = '14'
python3 mail_importer.py --pop3_tls
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import argparse
import traceback
import pickle
import logging
import logging.handlers
import time
import io
# pop/imap/emal
import poplib
import imaplib
import email
#import dateutil.tz
import datetime
# for gmail api
import httplib2
import base64
APPLICATION_NAME = "mail-importer"
FILENAME = os.path.basename(__file__)
stdout_fmt = '%(asctime)s %(levelname)s %(name)s - %(message)s'
file_fmt = '%(asctime)s %(process)d %(levelname)s %(name)s:%(funcName)s(%(filename)s:%(lineno)d) - %(message)s'
logger = logging.getLogger(APPLICATION_NAME)
class Cache(object):
pkl_name = FILENAME + ".cache"
def __init__(self, is_clear):
if is_clear:
self.ids = set()
else:
self.ids = self.load()
def add(self, id):
self.ids.add(id)
def remove(self, id):
self.ids.remove(id)
def is_member(self, id):
return id in self.ids
def load(self):
try:
with open(self.pkl_name, mode="rb") as f:
return pickle.load(f)
except IOError as e:
logger.error("IOError: %s" % e)
return set()
def dump(self):
with open(self.pkl_name, mode="wb") as f:
pickle.dump(self.ids, f)
# helpers
def set_logger(quiet, verbose, debug, colorize=True):
_lvl = logging.INFO + 10*quiet - 10*verbose
if debug:
_lvl = logging.DEBUG
_cformatter = logging.Formatter(stdout_fmt)
_ch = logging.StreamHandler()
_ch.setLevel(_lvl)
_ch.setFormatter(_cformatter)
_file_formatter = logging.Formatter(file_fmt)
_fh = logging.handlers.RotatingFileHandler(FILENAME + '.log', maxBytes=1024 * 1024 * 4, backupCount=4)
_fh.setLevel(logging.DEBUG)
_fh.setFormatter(_file_formatter)
logger = logging.getLogger(APPLICATION_NAME)
logger.setLevel(_lvl)
logger.addHandler(_ch)
logger.addHandler(_fh)
# email
def parse_date(msg):
m_date = msg['date']
d_date = email.utils.parsedate_to_datetime(m_date)
return d_date.replace(tzinfo=datetime.timezone.utc)#.astimezone().isoformat()
def parse_message(bytes: bytes):
#msg = email.message_from_bytes(bytes, policy=email.policy.SMTP)
msg = email.message_from_bytes(bytes)
date = parse_date(msg)
subject = msg['subject']
return (msg, date, subject)
import time
def import_(imap, msg, date, mail, dst_mbox):
try:
return imap.append("Inbox", '', date, mail.as_bytes())
except TypeError as e:
logger.error("TypeError: '{}'".format(e))
except imap.error as e:
logger.error("imap.error: '{}'".format(e))
# pop3
def login_pop3(host, username, pass_, port=0, is_tls=False, is_debug=False):
if is_tls:
p = port if port else poplib.POP3_SSL_PORT
M = poplib.POP3_SSL(host, port=p)
else:
p = port if port else poplib.POP3_PORT
M = poplib.POP3(host, port=p)
if is_debug:
M.set_debuglevel(1)
M.user(username)
M.pass_(pass_)
logger.info(M.getwelcome())
return M
def process_emails_pop3(args, cache):
# pop3 login
M = login_pop3(args.pop3_server, args.pop3_user, args.pop3_pass, args.pop3_tls, args.debug)
imap_M = login_imap(args.imap_server, args.imap_user, args.imap_pass, args.imap_port, args.imap_tls, args.debug)
imap_M.select(args.imap_dst_mbox)
numMessages = M.stat()[0]
logger.info("POP3 server has {} messages.".format(numMessages))
logger.debug("M.uidl: {}".format(M.uidl()))
# return if there are no emails.
if numMessages == 0:
r = M.quit()
logger.info(r)
return
# get email, insert gmail
try:
for i in range(numMessages, 0, -1):
try:
uid = M.uidl(i).split()[2]
logger.info("msg: {}: {}: {}".format(i, uid, M.uidl(i)))
logger.debug("cache: {}".format(cache.ids))
if cache.is_member(uid):
continue
raw_msg_bytes = b'\r\n'.join(M.retr(i)[1])
mail, d, s = parse_message(raw_msg_bytes)
logger.info("parsed: {}: {}: {}: {}".format(i, uid, d, s))
import_(imap_M, raw_msg_bytes, d, mail, args.imap_dst_mbox)
logger.info("imported: {}: {}: {}: {}".format(i, uid, d, s))
# set its seen flag
cache.add(uid)
if args.delete_after:
if d < (datetime.datetime.now() - datetime.timedelta(days=args.delete_after)).replace(tzinfo=datetime.timezone.utc):
M.dele(i)
logger.info("delete: %s: %s: %s: %s" % (i, d, uid, s))
cache.remove(uid)
except Exception as e:
if not args.force:
raise
logger.exception('Exception {} occured. Skip the email.'.format(e))
logger.warning('Ignore the exception and continue processing.')
continue
#input("Type 'Ctrl+C' if you want to interrupt program.")
finally:
# dump seen flag cache
cache.dump()
r = M.quit()
logger.info(r)
logout_imap(imap_M, True)
# imap
def login_imap(host, user, password, port=0, is_tls=False, is_debug=False):
if is_debug:
imaplib.Debug = 4
if is_tls:
p = port if port else imaplib.IMAP4_SSL_PORT
M = imaplib.IMAP4_SSL(host, port=p)
else:
p = port if port else imaplib.IMAP4_PORT
M = imaplib.IMAP4(host, port=p)
if 'AUTH=CRAM-MD5' in M.capabilities:
typ, data = M.login_cram_md5(user, password)
else:
typ, data = M.login(user, password)
logger.info("{} {}".format(typ, data))
return M
def logout_imap(M, expunge=False):
typ, data = M.expunge()
logger.debug("imap: {} {}".format(typ, data))
typ, data = M.close()
logger.debug("imap: {} {}".format(typ, data))
typ, data = M.logout()
logger.debug("imap: {} {}".format(typ, data))
def move_mbox(M, uid, dst):
# copy
typ, data = M.uid('COPY', uid, dst)
logger.debug("imap: {} {}".format(typ, data))
# flag delete
typ, data = M.uid('STORE', uid , '+FLAGS', '(\Deleted)')
logger.debug("imap: {} {}".format(typ, data))
def main():
# load seen flag cache
cache = Cache(args.nocache)
while args.interval:
try:
process_emails_pop3(args, cache)
logger.info("waiting interval...")
time.sleep(args.interval)
except KeyboardInterrupt:
sys.exit("Crtl+C pressed. Shutting down.")
else:
process_emails_pop3(args, cache)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Mail Importer for Gmail will import your emails on a POP3/IMAP-server to Gmail via Gmail API and HTTP-proxy, and puts UNREAD/INBOX labels on emails. It supports HTTP_PROXY/HTTPS_PROXY.')
parser.add_argument('-l', '--label', action="store", default=os.getenv("IMPORTED_LABEL", "_imported"))
parser.add_argument('--pop3_server', action="store", default=os.getenv("POP3_SERVER", 'localhost'))
parser.add_argument('--pop3_port', action="store", type=int, default=os.getenv("POP3_PORT", 0))
parser.add_argument('--pop3_proto', action="store", default=os.getenv("POP3_PROTOCOL", 'POP3'), choices=['POP3', 'IMAP'])
parser.add_argument('--pop3_user', action="store", default=os.getenv("POP3_USER"))
parser.add_argument('--pop3_pass', action="store", default=os.getenv("POP3_PASS"))
parser.add_argument('--imap_server', action="store", default=os.getenv("IMAP_SERVER", 'localhost'))
parser.add_argument('--imap_port', action="store", type=int, default=os.getenv("IMAP_PORT", 0))
parser.add_argument('--imap_user', action="store", default=os.getenv("IMAP_USER"))
parser.add_argument('--imap_pass', action="store", default=os.getenv("IMAP_PASS"))
parser.add_argument('--imap_dst_mbox', action="store", default=os.getenv("IMAP_DST_MBOX", "INBOX"), help="destination imap mailbox")
parser.add_argument('--move', action="store_true", help="Move imported messages into the destination mailbox")
parser.add_argument('--delete_after', action="store", type=int, default=os.getenv("DELETE_AFTER"), help="Delete imported messages after N days")
parser.add_argument('--pop3_tls', action="store_true", help="Enable TLS/SSL for POP3/IMAP protocol")
parser.add_argument('--imap_tls', action="store_true", default=True, help="Enable TLS/SSL for POP3/IMAP protocol")
parser.add_argument('-i', '--interval', action="store", type=int, default=os.getenv("INTERVAL"), help="Wait interval seconds between import process. Type Ctrl+c if you want stop program.")
parser.add_argument('-f', '--force', action="store_true", help="Ignore the exception and continue the import process, if used with the -i option.")
parser.add_argument('--nocache', action="store_true", help="Ignore seen flag cache.")
parser.add_argument('-v', '--verbose', action='count', default=0, help="Make the operation more talkative")
parser.add_argument('-q', '--quiet', action='count', default=0, help="Quiet mode")
parser.add_argument('-d', '--debug', action="store_true", help="Enable debug message.")
args = parser.parse_args()
# set logger
set_logger(args.quiet, args.verbose, args.debug)
if args.debug:
httplib2.debuglevel = 1 + args.verbose
logger.debug(args)
logger.debug('logging level: %s' % logger.getEffectiveLevel())
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment