Created
May 15, 2020 06:40
-
-
Save mydreambei-ai/51430dcc2deef4a5536abcd85824bf64 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import imaplib | |
import time | |
import email | |
import logging | |
import logging.config | |
logging_config = dict( | |
version=1, | |
formatters={'f': { | |
'format': '%(asctime)s %(name)-12s %(levelname)-8s %(message)s' | |
}}, | |
handlers={'h': { | |
'class': 'logging.StreamHandler', | |
'formatter': 'f', | |
'level': logging.DEBUG | |
}}, | |
root={ | |
'handlers': ['h'], | |
'level': logging.INFO, | |
}, | |
) | |
logging.config.dictConfig(logging_config) | |
logger = logging.getLogger(__name__) | |
class MailMonitorException(Exception): | |
pass | |
class MailMonitor(object): | |
imapserver = "imap.exmail.qq.com" | |
_username = None | |
_password = None | |
_interval = 10 | |
def __init__(self): | |
self.m = imaplib.IMAP4_SSL(self.imapserver) | |
ok, _ = self.m.noop() | |
self.check_ok(ok, "imapserver cannot connect") | |
self.login() | |
def login(self): | |
ok, _ = self.m.login(self._username, self._password) | |
self.check_ok(ok, "username, password error") | |
def check_ok(self, ok, msg): | |
if ok != "OK": | |
raise MailMonitorException(msg) | |
def get_mail_ids(self, data): | |
if len(data) > 0: | |
data = data[0] | |
if data: | |
return data.decode().split(" ") | |
return [] | |
return [] | |
def monitor_dir(self, dirname): | |
ok, _ = self.m.select(dirname) | |
self.check_ok(ok, "dirname error") | |
while True: | |
ok, data = self.m.search(None, '(UNSEEN)') | |
logger.info("unseen data: {0}".format(str(data))) | |
self.check_ok(ok, "search unseen data error") | |
ids = self.get_mail_ids(data) | |
for mid in ids: | |
ok, data = self.m.fetch(mid, 'RFC822') | |
self.check_ok(ok, "fetch mail error") | |
self.parse_email(data) | |
ok, _= self.m.store(mid, '+FLAGS', r'(\Seen)') | |
self.check_ok(ok, "set seen error") | |
time.sleep(self._interval) | |
def parse_email(self, mail): | |
_, y = mail[0] | |
ms = email.message_from_bytes(y) | |
for m in ms.walk(): | |
filename = m.get_filename() | |
if not filename: | |
continue | |
filename, charset = email.header.decode_header(filename)[0] | |
if type(filename) == bytes: | |
filename = filename.decode(charset) | |
logger.info("get filename {}".format(filename)) | |
with open(filename, "wb") as fp: | |
fp.write(m.get_payload(decode=True)) | |
if __name__ == "__main__": | |
MailMonitor().monitor_dir("&UXZO1mWHTvZZOQ-/huisu") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
monitor email folder and auto download attachment