Skip to content

Instantly share code, notes, and snippets.

@praul
Last active November 2, 2023 20:00
Show Gist options
  • Save praul/68710409583eb19c8c1b12c12a9302ff to your computer and use it in GitHub Desktop.
Save praul/68710409583eb19c8c1b12c12a9302ff to your computer and use it in GitHub Desktop.
Python out-of-office notice with rebound protection
"""
Automatically replies to mails both unread and unanswered.
This script has a rebound and self mail protection. Also a rate limit protection.
You can adjust the time value with the variable blockhours
Thanks to for original gist: https://gist.github.com/BertrandBordage/e07e5fe8191590daafafe0dbb05b5a7b
WARNING: This answers to any both unread and unanswered mail, even if it is years old.
Don’t use on a mailbox with old messages left unread and unanswered.
Simply subclass ``AutoReplyer``, define the undefined class attribute,
and call the ``run`` method on an instance. This loops until you stop the script
(using Ctrl+C, typically) or until an error occurs, like a network failure.
Example usage:
from autoreplyer import AutoReplyer
class YourAutoReplyer(AutoReplyer):
imap_server = 'your.imap.server'
imap_port = 1234 # Custom port, only use if the default one doesn’t work.
imap_user = '[email protected]'
imap_password = 'your_password'
smtp_server = 'your.smtp.server'
smtp_port = 5678 # Custom port, only use if the default one doesn’t work.
smtp_user = '[email protected]'
smtp_password = 'your_password'
blockhours = 12 #The time in which autoresponder does not respond. Rebound Protection
mymail = '[email protected]'
from_address = 'Your Displayed Name <[email protected]>'
body = '''
Hello,
I’m on vacation until a date I should mention here.
You’ll have an answer when I’m back.
Call Django in case of emergency.
Have a nice day,
You
'''
body_html = '''
<p>Hello</p>,
<p>
I’m on vacation until a date I should mention here.<br />
You’ll have an answer when I’m back.<br />
Call <a href="tel:+1234567890">Django</a> in case of emergency.
</p>
<p>
Have a nice day,<br />
You
</p>
'''
while True:
try:
YourAutoReplyer().run()
except KeyboardInterrupt:
exit()
"""
from email import message_from_bytes
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import make_msgid
from imaplib import IMAP4, IMAP4_SSL, IMAP4_PORT, IMAP4_SSL_PORT
from os import execlp
from smtplib import SMTP, SMTP_SSL, SMTP_PORT, SMTP_SSL_PORT
from subprocess import call
from textwrap import dedent
from time import sleep
import sqlite3
from datetime import datetime, timedelta
import time
__author__ = 'Bertrand Bordage (modified: praul)'
__copyright__ = 'Copyright © 2016 Bertrand Bordage'
__license__ = 'MIT'
class AutoReplyer:
refresh_delay = 60 # seconds
imap_server = None
imap_use_ssl = False
imap_port = IMAP4_PORT
imap_ssl_port = IMAP4_SSL_PORT
imap_user = None
imap_password = None
smtp_server = None
smtp_use_ssl = False
smtp_port = SMTP_PORT
smtp_ssl_port = SMTP_SSL_PORT
smtp_user = None
smtp_password = None
from_address = None
body = None
body_html = None
con = None
cur = None
blockhours = None
mymail = None
def __init__(self):
self.login()
def login(self):
if self.imap_use_ssl:
self.imap = IMAP4_SSL(self.imap_server, self.imap_ssl_port)
else:
self.imap = IMAP4(self.imap_server, self.imap_port)
self.imap.login(self.imap_user, self.imap_password)
if self.smtp_use_ssl:
self.smtp = SMTP_SSL(self.smtp_server, self.smtp_ssl_port)
else:
self.smtp = SMTP(self.smtp_server, self.smtp_port)
self.smtp.login(self.smtp_user, self.smtp_password)
def close(self):
self.smtp.close()
self.imap.logout()
def check_reply_timeout(self, sender):
print ('-------------------------------------------------------')
print ('Incoming mail from ' + sender + '. Checking history....')
#Check for mail from self
if (sender == self.mymail):
print ('Mail from self. Not sending any mail')
return False
#Check for recent incoming mails from this adress
self.db_connect()
send = True
for row in self.cur.execute("SELECT id,date FROM senders WHERE mail=?", (sender,)):
breakdate = datetime.now() - timedelta(hours=self.blockhours)
then = datetime.strptime(row[1], "%Y-%m-%d %H:%M:%S.%f")
print ('Found ' + sender + ' at ' + str(row[1]) + ' - ID ' + str(row[0]))
if (then < breakdate): #If older: Delete
print ('Last entry ' + str(row[0]) + ' from ' + sender + ' is old. Delete...' )
self.cur.execute("DELETE FROM senders WHERE id=?", (str(row[0])))
elif (then >= breakdate): #If Recent: Reject
print ('Recent entry found. Not sending any mail' )
send = False
if (send == False): return
#Accept
print ('Memorizing ' + sender)
self.cur.execute("INSERT INTO senders (mail, date) values (?, ?)", (sender, datetime.now() ) )
self.con.commit()
self.con.close()
return True
def db_connect(self):
self.con = sqlite3.connect('autoreply.db', detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
#self.con.set_trace_callback(print)
self.cur = self.con.cursor()
def create_table(self):
self.db_connect()
try: self.cur.execute('''CREATE TABLE senders (id INTEGER PRIMARY KEY, mail text, date datetime)''')
except: pass
self.con.commit()
self.con.close()
def create_auto_reply(self, original):
mail = MIMEMultipart('alternative')
mail['Message-ID'] = make_msgid()
mail['References'] = mail['In-Reply-To'] = original['Message-ID']
mail['Subject'] = 'Re: ' + original['Subject']
mail['From'] = self.from_address
mail['To'] = original['Reply-To'] or original['From']
mail.attach(MIMEText(dedent(self.body), 'plain'))
mail.attach(MIMEText(self.body_html, 'html'))
return mail
def send_auto_reply(self, original):
#Check if adress has been used 12h
senderfull = original['From']
sender = (senderfull.split('<'))[1].split('>')[0]
print (sender)
if (self.check_reply_timeout(sender) != True): return
#Send with Rate limit & Error prevention prevention
success = False
for i in range(5):
if success== False:
try:
self.smtp.sendmail(
self.from_address, [original['From']],
self.create_auto_reply(original).as_bytes())
success = True
log = 'Replied to “%s” for the mail “%s”' % (original['From'],
original['Subject'])
print(log)
break
except:
self.close()
print ('Error on send (rate limit?). Wait 30s and reconnect....')
time.sleep(30)
self.login()
try:
call(['notify-send', log])
except FileNotFoundError:
pass
def reply(self, mail_number):
self.imap.select(readonly=True)
_, data = self.imap.fetch(mail_number, '(RFC822)')
self.imap.close()
self.send_auto_reply(message_from_bytes(data[0][1]))
self.imap.select(readonly=False)
self.imap.store(mail_number, '+FLAGS', '\\Answered')
self.imap.close()
def check_mails(self):
self.imap.select(readonly=False)
_, data = self.imap.search(None, '(UNSEEN UNANSWERED)')
self.imap.close()
for mail_number in data[0].split():
self.reply(mail_number)
time.sleep(10) # Rate Limit prevention
def run(self):
self.create_table()
print ('Now listening... Blocking rebounds for ' + str(self.blockhours) + ' hours')
try:
while True:
self.check_mails()
sleep(self.refresh_delay)
finally:
self.close()
@praul
Copy link
Author

praul commented Jun 28, 2021

See: https://github.com/praul/autoreply for new full version

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment