-
-
Save TheAshwanik/6094515 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""MailBox class for processing IMAP email. | |
(To use with Gmail: enable IMAP access in your Google account settings) | |
usage with GMail: | |
import mailbox | |
with mailbox.MailBox(gmail_username, gmail_password) as mbox: | |
print mbox.get_count() | |
print mbox.print_msgs() | |
for other IMAP servers, adjust settings as necessary. | |
""" | |
import imaplib | |
import time | |
import uuid | |
from email import email | |
IMAP_SERVER = 'imap.gmail.com' | |
IMAP_PORT = '993' | |
IMAP_USE_SSL = True | |
class MailBox(object): | |
def __init__(self, user, password): | |
self.user = user | |
self.password = password | |
if IMAP_USE_SSL: | |
self.imap = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) | |
else: | |
self.imap = imaplib.IMAP4(IMAP_SERVER, IMAP_PORT) | |
def __enter__(self): | |
self.imap.login(self.user, self.password) | |
return self | |
def __exit__(self, type, value, traceback): | |
self.imap.close() | |
self.imap.logout() | |
def get_count(self): | |
self.imap.select('Inbox') | |
status, data = self.imap.search(None, 'ALL') | |
return sum(1 for num in data[0].split()) | |
def fetch_message(self, num): | |
self.imap.select('Inbox') | |
status, data = self.imap.fetch(str(num), '(RFC822)') | |
email_msg = email.message_from_string(data[0][1]) | |
return email_msg | |
def delete_message(self, num): | |
self.imap.select('Inbox') | |
self.imap.store(num, '+FLAGS', r'\Deleted') | |
self.imap.expunge() | |
def delete_all(self): | |
self.imap.select('Inbox') | |
status, data = self.imap.search(None, 'ALL') | |
for num in data[0].split(): | |
self.imap.store(num, '+FLAGS', r'\Deleted') | |
self.imap.expunge() | |
def print_msgs(self): | |
self.imap.select('Inbox') | |
status, data = self.imap.search(None, 'ALL') | |
for num in reversed(data[0].split()): | |
status, data = self.imap.fetch(num, '(RFC822)') | |
print 'Message %s\n%s\n' % (num, data[0][1]) | |
def get_latest_email_sent_to(self, email_address, timeout=300, poll=1): | |
start_time = time.time() | |
while ((time.time() - start_time) < timeout): | |
# It's no use continuing until we've successfully selected | |
# the inbox. And if we don't select it on each iteration | |
# before searching, we get intermittent failures. | |
status, data = self.imap.select('Inbox') | |
if status != 'OK': | |
time.sleep(poll) | |
continue | |
status, data = self.imap.search(None, 'TO', email_address) | |
data = [d for d in data if d is not None] | |
if status == 'OK' and data: | |
for num in reversed(data[0].split()): | |
status, data = self.imap.fetch(num, '(RFC822)') | |
email_msg = email.message_from_string(data[0][1]) | |
return email_msg | |
time.sleep(poll) | |
raise AssertionError("No email sent to '%s' found in inbox " | |
"after polling for %s seconds." % (email_address, timeout)) | |
def delete_msgs_sent_to(self, email_address): | |
self.imap.select('Inbox') | |
status, data = self.imap.search(None, 'TO', email_address) | |
if status == 'OK': | |
for num in reversed(data[0].split()): | |
status, data = self.imap.fetch(num, '(RFC822)') | |
self.imap.store(num, '+FLAGS', r'\Deleted') | |
self.imap.expunge() | |
if __name__ == '__main__': | |
# example: | |
imap_username = '[email protected]' | |
imap_password = 'secret' | |
with MailBox(imap_username, imap_password) as mbox: | |
print mbox.get_count() | |
print mbox.print_msgs() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment