Created
December 18, 2014 22:19
-
-
Save johnpaulhayes/106c3e40dc04b6a6b516 to your computer and use it in GitHub Desktop.
Python Interface for downloading email attachments for unread email.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import email | |
import imaplib | |
import os | |
class PortalEmail(): | |
""" | |
Interface to downloading attachments from | |
an email address and save them to a folder. | |
TODO: Implement the with context protocol | |
""" | |
connection = None | |
error = None | |
def __init__(self, mail_server, username, password): | |
self.connection = imaplib.IMAP4_SSL(mail_server) | |
self.connection.login(username, password) | |
self.connection.select(readonly=False) # so we can mark mails as read | |
def close_connection(self): | |
""" | |
Close the connection to the IMAP server | |
""" | |
self.connection.close() | |
def save_attachment(self, msg, download_folder="/tmp"): | |
""" | |
Given a message, save its attachments to the specified | |
download folder (default is /tmp) | |
return: file path to attachment | |
""" | |
att_path = "No attachment found." | |
for part in msg.walk(): | |
if part.get_content_maintype() == 'multipart': | |
continue | |
if part.get('Content-Disposition') is None: | |
continue | |
filename = part.get_filename() | |
att_path = os.path.join(download_folder, filename) | |
if not os.path.isfile(att_path): | |
fp = open(att_path, 'wb') | |
fp.write(part.get_payload(decode=True)) | |
fp.close() | |
return att_path | |
def fetch_unread_messages(self): | |
""" | |
Retrieve unread messages | |
""" | |
emails = [] | |
(result, messages) = self.connection.search(None, 'UnSeen') | |
if result == "OK": | |
for message in messages[0].split(' '): | |
try: | |
ret, data = self.connection.fetch(message,'(RFC822)') | |
except: | |
print "No new emails to read." | |
self.close_connection() | |
exit() | |
msg = email.message_from_string(data[0][1]) | |
if isinstance(msg, str) == False: | |
emails.append(msg) | |
response, data = self.connection.store(message, '+FLAGS','\\Seen') | |
return emails | |
self.error = "Failed to retreive emails." | |
return emails | |
def parse_email_address(self, email_address): | |
""" | |
Helper function to parse out the email address from the message | |
return: tuple (name, address). Eg. ('John Doe', '[email protected]') | |
""" | |
return email.utils.parseaddr(email_address) | |
""" Usage """ | |
portal_email = PortalEmail('a.mail.server', | |
'[email protected]', 'a refreshingly long and secure password should go here!!') | |
emails = portal_email.fetch_unread_messages() | |
for m in emails: | |
print portal_email.save_attachment(m) | |
print "Processed {0}".format(len(emails)) | |
portal_email.close_connection() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment