-
-
Save artschwagerb/7602784 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""MailBox class for processing IMAP email. | |
(To use with Gmail: enable IMAP access in your Google account settings) | |
usage with GMail: | |
import mailbox | |
with mailbox.MailBox(gmail_username, gmail_password) as mbox: | |
print mbox.get_count() | |
print mbox.print_msgs() | |
for other IMAP servers, adjust settings as necessary. | |
""" | |
import imaplib | |
import time | |
import uuid | |
from email import email | |
IMAP_SERVER = 'imap.gmail.com' | |
IMAP_PORT = '993' | |
IMAP_USE_SSL = True | |
class MailBox(object): | |
def __init__(self, user, password): | |
self.user = user | |
self.password = password | |
if IMAP_USE_SSL: | |
self.imap = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) | |
else: | |
self.imap = imaplib.IMAP4(IMAP_SERVER, IMAP_PORT) | |
def __enter__(self): | |
self.imap.login(self.user, self.password) | |
return self | |
def __exit__(self, type, value, traceback): | |
#self.imap.close() | |
self.imap.logout() | |
def get_count(self,folder): | |
self.imap.select(folder) | |
status, data = self.imap.search(None, 'ALL') | |
return sum(1 for num in data[0].split()) | |
def get_folders(self): | |
f_list = [] | |
folders = self.imap.list()[1] | |
for f in folders: | |
if f != "None": | |
folder = f.split(' "/" ')[1] | |
f_list.append(folder) | |
return f_list | |
def fetch_message(self, num): | |
self.imap.select('Inbox') | |
status, data = self.imap.fetch(str(num), '(RFC822)') | |
email_msg = email.message_from_string(data[0][1]) | |
return email_msg | |
def get_message_nums(self, folder): | |
#Return a list of message num for looping in scripts | |
self.imap.select(folder) | |
status, data = self.imap.search(None, 'ALL') | |
return reversed(data[0].split()) | |
def delete_message(self, num): | |
self.imap.select('Inbox') | |
self.imap.store(num, '+FLAGS', r'\Deleted') | |
self.imap.expunge() | |
def delete_all(self,folder): | |
self.imap.select(folder) | |
status, data = self.imap.search(None, 'ALL') | |
for num in data[0].split(): | |
self.imap.store(num, '+FLAGS', r'\Deleted') | |
self.imap.expunge() | |
print "Deleted All Messages From '" + folder + "'" | |
def delete_older_than(self,folder,datestr): | |
self.imap.select(folder) | |
typ, data = self.imap.search(None, datestr) | |
for num in data[0].split(): | |
self.imap.store(num, '+FLAGS', r'\Deleted') | |
print "Deleted All Messages From '" + folder + "' " + datestr | |
def print_msgs(self): | |
self.imap.select('Inbox') | |
status, data = self.imap.search(None, 'ALL') | |
for num in reversed(data[0].split()): | |
status, data = self.imap.fetch(num, '(RFC822)') | |
print 'Message %s\n%s\n' % (num, data[0][1]) | |
def get_latest_email_sent_to(self, email_address, timeout=300, poll=1): | |
start_time = time.time() | |
while ((time.time() - start_time) < timeout): | |
# It's no use continuing until we've successfully selected | |
# the inbox. And if we don't select it on each iteration | |
# before searching, we get intermittent failures. | |
status, data = self.imap.select('Inbox') | |
if status != 'OK': | |
time.sleep(poll) | |
continue | |
status, data = self.imap.search(None, 'TO', email_address) | |
data = [d for d in data if d is not None] | |
if status == 'OK' and data: | |
for num in reversed(data[0].split()): | |
status, data = self.imap.fetch(num, '(RFC822)') | |
email_msg = email.message_from_string(data[0][1]) | |
return email_msg | |
time.sleep(poll) | |
raise AssertionError("No email sent to '%s' found in inbox " | |
"after polling for %s seconds." % (email_address, timeout)) | |
def delete_msgs_sent_to(self, email_address): | |
self.imap.select('Inbox') | |
status, data = self.imap.search(None, 'TO', email_address) | |
if status == 'OK': | |
for num in reversed(data[0].split()): | |
status, data = self.imap.fetch(num, '(RFC822)') | |
self.imap.store(num, '+FLAGS', r'\Deleted') | |
self.imap.expunge() | |
if __name__ == '__main__': | |
# example: | |
imap_username = '[email protected]' | |
imap_password = 'password' | |
with MailBox(imap_username, imap_password) as mbox: | |
print mbox.get_count('Inbox') | |
print "All Folders:" | |
print mbox.get_folders() | |
#print mbox.delete_all('[Gmail]/Starred') | |
#print mbox.delete_older_than('Inbox','(BEFORE 20-Mar-2014)') | |
#print mbox.print_msgs() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment