Last active
April 19, 2016 20:57
-
-
Save vinovator/588abbeaebf7921037f3d74f47d66196 to your computer and use it in GitHub Desktop.
Mail bot sript that sends predefined response to predefined mails. Intended for raspberry pi, which has its dedicated mail id
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# mailbot.py | |
# python 2.7.6 | |
""" | |
Mail bot sript that sends predefined response to predefined mails | |
Intended for raspberry pi, which has its dedicated mail id | |
Algorithm | |
1) Check a dedicated mailbox inbox for "unread" mails | |
2) For each "unread" mail, fetch the sender, subject and content | |
3) If the sender is from a pre-approved list, proceed | |
4) If the subject is from a pre-approved list, proceed | |
5) Optionally check for content as well, depending upon subject | |
6) Based on the subject determine the relevant "predefined" response | |
7) Send the response back to the pre-approved sender | |
8) Mark the mail as read | |
9) Configure the script in a cron job or windows scheduler to poll every X mins | |
""" | |
import imaplib | |
import smtplib | |
import email | |
from IGNORE import raspi_secrets # Untracked py to carry secrets | |
from UTILS import mailmap # Mapping between mail subject and response | |
IMAP_server = "imap.gmail.com" # google imap server | |
SMTP_server = "smtp.gmail.com" # google smtp server | |
# List of constants inherited from an untracked file | |
mail_id = raspi_secrets.login["mail"] | |
pwd = raspi_secrets.login["password"] | |
# List of authorized mail ids | |
authorized_senders = raspi_secrets.authorized_senders | |
# Dict with subject - response pairs | |
mail_map = mailmap.mail_map | |
def imap_login(): | |
# Instantiate IMAP interface | |
imap = imaplib.IMAP4_SSL(IMAP_server) | |
# Login to IMAP server | |
try: | |
status, login_summary = imap.login(mail_id, pwd) | |
if status == "OK": | |
return status, imap | |
else: | |
return "login failure", None | |
except imaplib.IMAP4.error as e: | |
return "login exception: " + str(e), None | |
def get_unread_mail(imap): | |
""" Connect to inbox and fetch unread mails """ | |
# Select inbox folder | |
inbox_status, data = imap.select("Inbox") | |
if not inbox_status == "OK": | |
return inbox_status, None | |
# Search for unread mails | |
unread_mail_status, messages = imap.search(None, "(UNSEEN)") | |
return unread_mail_status, messages | |
def is_authorized(mail_id): | |
""" Check if the given mail id is from pre-authorized list """ | |
if email.utils.parseaddr(mail_id)[1] in authorized_senders: | |
return True | |
else: | |
# TODO: Notify owner | |
return False | |
def process_mail(message_data): | |
""" Get the sender and subject metadata from each mail """ | |
for section in message_data: | |
if isinstance(section, tuple): | |
message = email.message_from_string(section[1]) | |
sender = message["From"] | |
subject = message["Subject"] | |
if is_authorized(sender): | |
return (sender, subject) # return as tuple | |
else: | |
return None | |
def smtp_login(): | |
""" Login to smtp server """ | |
smtpObj = smtplib.SMTP(SMTP_server, 587) # 587 indicates TLS encryption | |
# Send a hello message to establish connection, returns 250 if success | |
smtpObj.ehlo() | |
smtpObj.starttls() # start TLS encryption | |
# login to server, 235 is returned if success | |
smtpObj.login(mail_id, pwd) | |
return smtpObj | |
def send_mail(smtpObj, mail_id, recieved_subject): | |
""" Given the mail subject, provide pre-defined response """ | |
# Send mail | |
to_name, to_email = email.utils.parseaddr(mail_id) | |
mail_subject = mail_map[recieved_subject] | |
mail_body = "Hello {0}, {1}. Cheers".format(to_name, mail_subject) | |
smtpObj.sendmail(mail_id, | |
to_email, | |
"subject:" + mail_subject + "\n" + mail_body) | |
if __name__ == "__main__": | |
""" Starting block """ | |
error_message = "" | |
send_mail_queue = list() | |
# login to the IMAP server | |
login_status, imapObj = imap_login() | |
if not login_status == "OK": | |
error_message = login_status | |
# Fetch unread mail ids | |
mail_status, messages = get_unread_mail(imapObj) | |
if not mail_status == "OK": | |
error_message = mail_status | |
# Get data from each unread mail message | |
for mail_num in messages[0].split(): # e.g. for 2 mails ['1 2'] | |
message_code, message_data = imapObj.fetch(mail_num, "(RFC822)") | |
if message_code == "OK": | |
# returns tuple - mail_id, subject | |
entry = process_mail(message_data) | |
if entry: | |
send_mail_queue.append(entry) | |
# Close and logout of IMAP server | |
imapObj.close() | |
imapObj.logout() | |
# Login to SMTP server | |
if len(send_mail_queue) > 0: | |
smtp = smtp_login() | |
# Send mail response from SMTP server | |
for item in send_mail_queue: | |
mail_id = item[0] | |
subject = item[1].lower() | |
if subject in mail_map.keys(): | |
print "Dict match found" | |
send_mail(smtp, mail_id, subject) | |
else: | |
pass | |
# TODO: Notify sender abour unrecognized subject | |
# Finally disconnect from SMTP server | |
smtp.quit() | |
# Finally if any error message, print it | |
if error_message: | |
print error_message |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# mailbot_py3.py | |
# python 3.5 | |
""" | |
Mail bot sript that sends predefined response to predefined mails | |
Intended for raspberry pi, which has its dedicated mail id | |
Algorithm | |
1) Check a dedicated mailbox inbox for "unread" mails | |
2) For each "unread" mail, fetch the sender, subject and content | |
3) If the sender is from a pre-approved list, proceed | |
4) If the subject is from a pre-approved list, proceed | |
5) Optionally check for content as well, depending upon subject | |
6) Based on the subject determine the relevant "predefined" response | |
7) Send the response back to the pre-approved sender | |
8) Mark the mail as read | |
9) Configure the script in a cron job or windows scheduler to poll every X mins | |
""" | |
import imaplib | |
import smtplib | |
import email | |
from IGNORE import raspi_secrets # Untracked py to carry secrets | |
from UTILS import mailmap # Mapping between mail subject and response | |
IMAP_server = "imap.gmail.com" # google imap server | |
SMTP_server = "smtp.gmail.com" # google smtp server | |
# List of constants inherited from an untracked file | |
mail_id = raspi_secrets.login["mail"] | |
pwd = raspi_secrets.login["password"] | |
# List of authorized mail ids | |
authorized_senders = raspi_secrets.authorized_senders | |
# Dict with subject - response pairs | |
mail_map = mailmap.mail_map | |
def imap_login(): | |
# Instantiate IMAP interface | |
imap = imaplib.IMAP4_SSL(IMAP_server) | |
# Login to IMAP server | |
try: | |
status, login_summary = imap.login(mail_id, pwd) | |
if status == "OK": | |
return status, imap | |
else: | |
return "login failure", None | |
except imaplib.IMAP4.error as e: | |
return "login exception: " + str(e), None | |
def get_unread_mail(imap): | |
""" Connect to inbox and fetch unread mails """ | |
# Select inbox folder | |
inbox_status, data = imap.select(mailbox="INBOX") | |
if not inbox_status == "OK": | |
return inbox_status, None | |
# Search for unread mails | |
unread_mail_status, messages = imap.search(None, "(UNSEEN)") | |
return unread_mail_status, messages | |
def is_authorized(mail_id): | |
""" Check if the given mail id is from pre-authorized list """ | |
if email.utils.parseaddr(mail_id)[1] in authorized_senders: | |
return True | |
else: | |
# TODO: Notify owner | |
return False | |
def process_mail(message_data): | |
""" Get the sender and subject metadata from each mail """ | |
for section in message_data: | |
if isinstance(section, tuple): | |
# for python 2.x, change this to message_from_string | |
message = email.message_from_bytes(section[1]) | |
sender = message["From"] | |
subject = message["Subject"] | |
if is_authorized(sender): | |
return (sender, subject) # return as tuple | |
else: | |
return None | |
def smtp_login(): | |
""" Login to smtp server """ | |
smtpObj = smtplib.SMTP(SMTP_server, 587) # 587 indicates TLS encryption | |
# Send a hello message to establish connection, returns 250 if success | |
smtpObj.ehlo() | |
smtpObj.starttls() # start TLS encryption | |
# login to server, 235 is returned if success | |
smtpObj.login(mail_id, pwd) | |
return smtpObj | |
def send_mail(smtpObj, mail_id, recieved_subject): | |
""" Given the mail subject, provide pre-defined response """ | |
# Send mail | |
to_name, to_email = email.utils.parseaddr(mail_id) | |
mail_subject = mail_map[recieved_subject] | |
mail_body = "Hello {0}, {1}. Cheers".format(to_name, mail_subject) | |
smtpObj.sendmail(mail_id, | |
to_email, | |
"subject:" + mail_subject + "\n" + mail_body) | |
def main(): | |
""" Script that controls all function calls """ | |
send_mail_queue = list() | |
# login to the IMAP server | |
login_status, imapObj = imap_login() | |
if not login_status == "OK": | |
return login_status | |
# Fetch unread mail ids | |
mail_status, messages = get_unread_mail(imapObj) | |
if not mail_status == "OK": | |
return mail_status | |
# Get data from each unread mail message | |
for mail_num in messages[0].split(): # e.g. for 2 mails ['1 2'] | |
message_code, message_data = imapObj.fetch(mail_num, "(RFC822)") | |
if message_code == "OK": | |
# returns tuple - mail_id, subject | |
entry = process_mail(message_data) | |
if entry: | |
send_mail_queue.append(entry) | |
# Close and logout of IMAP server | |
imapObj.close() | |
imapObj.logout() | |
# Login to SMTP server | |
if len(send_mail_queue) > 0: | |
smtp = smtp_login() | |
# Send mail response from SMTP server | |
for item in send_mail_queue: | |
mail_id = item[0] | |
subject = item[1].lower() | |
if subject in mail_map.keys(): | |
send_mail(smtp, mail_id, subject) | |
else: | |
pass | |
# TODO: Notify sender abour unrecognized subject | |
# Finally disconnect from SMTP server | |
smtp.quit() | |
return None | |
if __name__ == "__main__": | |
""" Starting block """ | |
error_message = main() | |
# Finally if any error message, print it | |
if error_message: | |
print(error_message) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment