Created
March 16, 2017 17:05
-
-
Save bsquidwrd/0f82bba5e73f1fcf532fc80620cdff3d to your computer and use it in GitHub Desktop.
Fetch emails with IMAP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import email | |
import imaplib | |
import os | |
class FetchEmail(): | |
connection = None | |
error = None | |
def __init__(self, mail_server, username, password): | |
self.connection = imaplib.IMAP4_SSL(mail_server) | |
self.connection.login(username, password) | |
self.connection.select(readonly=False) # so we can mark mails as read | |
def close_connection(self): | |
""" | |
Close the connection to the IMAP server | |
""" | |
self.connection.close() | |
def save_attachment(self, msg, download_folder="/tmp"): | |
""" | |
Given a message, save its attachments to the specified | |
download folder (default is /tmp) | |
return: file path to attachment | |
""" | |
att_path = "No attachment found." | |
for part in msg.walk(): | |
if part.get_content_maintype() == 'multipart': | |
continue | |
if part.get('Content-Disposition') is None: | |
continue | |
filename = part.get_filename() | |
att_path = os.path.join(download_folder, filename) | |
if not os.path.isfile(att_path): | |
fp = open(att_path, 'wb') | |
fp.write(part.get_payload(decode=True)) | |
fp.close() | |
return att_path | |
def fetch_unread_messages(self): | |
""" | |
Retrieve unread messages | |
""" | |
emails = [] | |
(result, messages) = self.connection.search(None, 'UnSeen') | |
if result == "OK": | |
for message in messages[0].decode('utf-8').split(' '): | |
try: | |
ret, data = self.connection.fetch(message,'(RFC822)') | |
except: | |
print("No new emails to read.") | |
self.close_connection() | |
exit() | |
msg = email.message_from_string(data[0][1].decode('utf-8')) | |
if isinstance(msg, str) == False: | |
emails.append(msg) | |
response, data = self.connection.store(message, '+FLAGS','\\Seen') | |
return emails | |
self.error = "Failed to retreive emails." | |
return emails | |
def parse_email_address(self, email_address): | |
""" | |
Helper function to parse out the email address from the message | |
return: tuple (name, address). Eg. ('John Doe', '[email protected]') | |
""" | |
return email.utils.parseaddr(email_address) | |
if __name__ == '__main__': | |
f = FetchEmail( | |
mail_server='imap.gmail.com', | |
username='[email protected]', | |
password='Password!' | |
) | |
msgs = f.fetch_unread_messages() | |
for m in msgs: | |
file_location = f.save_attachment( | |
msg=m, | |
download_folder='C:\\tmp' | |
) | |
print(file_location) | |
f.close_connection() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment