Last active
November 2, 2023 20:00
-
-
Save praul/68710409583eb19c8c1b12c12a9302ff to your computer and use it in GitHub Desktop.
Python out-of-office notice with rebound protection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Automatically replies to mails both unread and unanswered. | |
This script has a rebound and self mail protection. Also a rate limit protection. | |
You can adjust the time value with the variable blockhours | |
Thanks to for original gist: https://gist.github.com/BertrandBordage/e07e5fe8191590daafafe0dbb05b5a7b | |
WARNING: This answers to any both unread and unanswered mail, even if it is years old. | |
Don’t use on a mailbox with old messages left unread and unanswered. | |
Simply subclass ``AutoReplyer``, define the undefined class attribute, | |
and call the ``run`` method on an instance. This loops until you stop the script | |
(using Ctrl+C, typically) or until an error occurs, like a network failure. | |
Example usage: | |
from autoreplyer import AutoReplyer | |
class YourAutoReplyer(AutoReplyer): | |
imap_server = 'your.imap.server' | |
imap_port = 1234 # Custom port, only use if the default one doesn’t work. | |
imap_user = '[email protected]' | |
imap_password = 'your_password' | |
smtp_server = 'your.smtp.server' | |
smtp_port = 5678 # Custom port, only use if the default one doesn’t work. | |
smtp_user = '[email protected]' | |
smtp_password = 'your_password' | |
blockhours = 12 #The time in which autoresponder does not respond. Rebound Protection | |
mymail = '[email protected]' | |
from_address = 'Your Displayed Name <[email protected]>' | |
body = ''' | |
Hello, | |
I’m on vacation until a date I should mention here. | |
You’ll have an answer when I’m back. | |
Call Django in case of emergency. | |
Have a nice day, | |
You | |
''' | |
body_html = ''' | |
<p>Hello</p>, | |
<p> | |
I’m on vacation until a date I should mention here.<br /> | |
You’ll have an answer when I’m back.<br /> | |
Call <a href="tel:+1234567890">Django</a> in case of emergency. | |
</p> | |
<p> | |
Have a nice day,<br /> | |
You | |
</p> | |
''' | |
while True: | |
try: | |
YourAutoReplyer().run() | |
except KeyboardInterrupt: | |
exit() | |
""" | |
from email import message_from_bytes | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.text import MIMEText | |
from email.utils import make_msgid | |
from imaplib import IMAP4, IMAP4_SSL, IMAP4_PORT, IMAP4_SSL_PORT | |
from os import execlp | |
from smtplib import SMTP, SMTP_SSL, SMTP_PORT, SMTP_SSL_PORT | |
from subprocess import call | |
from textwrap import dedent | |
from time import sleep | |
import sqlite3 | |
from datetime import datetime, timedelta | |
import time | |
__author__ = 'Bertrand Bordage (modified: praul)' | |
__copyright__ = 'Copyright © 2016 Bertrand Bordage' | |
__license__ = 'MIT' | |
class AutoReplyer: | |
refresh_delay = 60 # seconds | |
imap_server = None | |
imap_use_ssl = False | |
imap_port = IMAP4_PORT | |
imap_ssl_port = IMAP4_SSL_PORT | |
imap_user = None | |
imap_password = None | |
smtp_server = None | |
smtp_use_ssl = False | |
smtp_port = SMTP_PORT | |
smtp_ssl_port = SMTP_SSL_PORT | |
smtp_user = None | |
smtp_password = None | |
from_address = None | |
body = None | |
body_html = None | |
con = None | |
cur = None | |
blockhours = None | |
mymail = None | |
def __init__(self): | |
self.login() | |
def login(self): | |
if self.imap_use_ssl: | |
self.imap = IMAP4_SSL(self.imap_server, self.imap_ssl_port) | |
else: | |
self.imap = IMAP4(self.imap_server, self.imap_port) | |
self.imap.login(self.imap_user, self.imap_password) | |
if self.smtp_use_ssl: | |
self.smtp = SMTP_SSL(self.smtp_server, self.smtp_ssl_port) | |
else: | |
self.smtp = SMTP(self.smtp_server, self.smtp_port) | |
self.smtp.login(self.smtp_user, self.smtp_password) | |
def close(self): | |
self.smtp.close() | |
self.imap.logout() | |
def check_reply_timeout(self, sender): | |
print ('-------------------------------------------------------') | |
print ('Incoming mail from ' + sender + '. Checking history....') | |
#Check for mail from self | |
if (sender == self.mymail): | |
print ('Mail from self. Not sending any mail') | |
return False | |
#Check for recent incoming mails from this adress | |
self.db_connect() | |
send = True | |
for row in self.cur.execute("SELECT id,date FROM senders WHERE mail=?", (sender,)): | |
breakdate = datetime.now() - timedelta(hours=self.blockhours) | |
then = datetime.strptime(row[1], "%Y-%m-%d %H:%M:%S.%f") | |
print ('Found ' + sender + ' at ' + str(row[1]) + ' - ID ' + str(row[0])) | |
if (then < breakdate): #If older: Delete | |
print ('Last entry ' + str(row[0]) + ' from ' + sender + ' is old. Delete...' ) | |
self.cur.execute("DELETE FROM senders WHERE id=?", (str(row[0]))) | |
elif (then >= breakdate): #If Recent: Reject | |
print ('Recent entry found. Not sending any mail' ) | |
send = False | |
if (send == False): return | |
#Accept | |
print ('Memorizing ' + sender) | |
self.cur.execute("INSERT INTO senders (mail, date) values (?, ?)", (sender, datetime.now() ) ) | |
self.con.commit() | |
self.con.close() | |
return True | |
def db_connect(self): | |
self.con = sqlite3.connect('autoreply.db', detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES) | |
#self.con.set_trace_callback(print) | |
self.cur = self.con.cursor() | |
def create_table(self): | |
self.db_connect() | |
try: self.cur.execute('''CREATE TABLE senders (id INTEGER PRIMARY KEY, mail text, date datetime)''') | |
except: pass | |
self.con.commit() | |
self.con.close() | |
def create_auto_reply(self, original): | |
mail = MIMEMultipart('alternative') | |
mail['Message-ID'] = make_msgid() | |
mail['References'] = mail['In-Reply-To'] = original['Message-ID'] | |
mail['Subject'] = 'Re: ' + original['Subject'] | |
mail['From'] = self.from_address | |
mail['To'] = original['Reply-To'] or original['From'] | |
mail.attach(MIMEText(dedent(self.body), 'plain')) | |
mail.attach(MIMEText(self.body_html, 'html')) | |
return mail | |
def send_auto_reply(self, original): | |
#Check if adress has been used 12h | |
senderfull = original['From'] | |
sender = (senderfull.split('<'))[1].split('>')[0] | |
print (sender) | |
if (self.check_reply_timeout(sender) != True): return | |
#Send with Rate limit & Error prevention prevention | |
success = False | |
for i in range(5): | |
if success== False: | |
try: | |
self.smtp.sendmail( | |
self.from_address, [original['From']], | |
self.create_auto_reply(original).as_bytes()) | |
success = True | |
log = 'Replied to “%s” for the mail “%s”' % (original['From'], | |
original['Subject']) | |
print(log) | |
break | |
except: | |
self.close() | |
print ('Error on send (rate limit?). Wait 30s and reconnect....') | |
time.sleep(30) | |
self.login() | |
try: | |
call(['notify-send', log]) | |
except FileNotFoundError: | |
pass | |
def reply(self, mail_number): | |
self.imap.select(readonly=True) | |
_, data = self.imap.fetch(mail_number, '(RFC822)') | |
self.imap.close() | |
self.send_auto_reply(message_from_bytes(data[0][1])) | |
self.imap.select(readonly=False) | |
self.imap.store(mail_number, '+FLAGS', '\\Answered') | |
self.imap.close() | |
def check_mails(self): | |
self.imap.select(readonly=False) | |
_, data = self.imap.search(None, '(UNSEEN UNANSWERED)') | |
self.imap.close() | |
for mail_number in data[0].split(): | |
self.reply(mail_number) | |
time.sleep(10) # Rate Limit prevention | |
def run(self): | |
self.create_table() | |
print ('Now listening... Blocking rebounds for ' + str(self.blockhours) + ' hours') | |
try: | |
while True: | |
self.check_mails() | |
sleep(self.refresh_delay) | |
finally: | |
self.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
See: https://github.com/praul/autoreply for new full version