|
#!/usr/bin/env python3 |
|
|
|
# License: |
|
# Free to modify and reuse at your own risks. |
|
# Keep link to original location: https://gist.github.com/f5395c25b7394822f098084ad7cdbbe8 |
|
|
|
import argparse |
|
import email |
|
import imaplib |
|
import smtplib |
|
import ssl |
|
import time |
|
from datetime import datetime, timedelta, timezone |
|
from email.header import decode_header |
|
from time import mktime |
|
|
|
|
|
def decode_subject(encoded_subject): |
|
"""Decode the subject of an email message.""" |
|
if isinstance(encoded_subject, bytes): |
|
encoded_subject = encoded_subject.decode("utf-8") |
|
decoded_parts = decode_header(encoded_subject) |
|
decoded_subject = "" |
|
for part in decoded_parts: |
|
if isinstance(part[0], bytes): |
|
decoded_subject += part[0].decode(part[1] or "utf-8") |
|
else: |
|
decoded_subject += part[0] |
|
return decoded_subject |
|
|
|
|
|
def check_email(imap_host, imap_port, imap_user, imap_password, subject, timeout): |
|
"""Check the email server for new messages matching the specified subject and received within the specified time period.""" |
|
context = ssl.create_default_context() |
|
with imaplib.IMAP4_SSL(imap_host, imap_port) as imap_server: |
|
imap_server.login(imap_user, imap_password) |
|
imap_server.select() |
|
start_time = datetime.now(timezone.utc) - timedelta(minutes=timeout) |
|
start_time_str = start_time.strftime("%d-%b-%Y") |
|
escaped_subject = subject.replace('"', '\\"') |
|
search_string = '(SUBJECT "{}" SINCE "{}")'.format( |
|
escaped_subject, start_time_str |
|
) |
|
# print(search_string) |
|
status, email_ids = imap_server.search(None, search_string) |
|
if status != "OK": |
|
return False |
|
# print(email_ids) |
|
email_ids = email_ids[0].split() |
|
# Sort in reverse numerical order - biggest is likely most recent |
|
email_ids = sorted(email_ids, key=lambda x: int(x), reverse=True) |
|
for email_id in email_ids: |
|
result, email_data = imap_server.fetch(email_id, "(INTERNALDATE)") |
|
if result == "OK" and email_data[0]: |
|
t_struct = imaplib.Internaldate2tuple(email_data[0]) |
|
email_time = datetime.fromtimestamp(mktime(t_struct), timezone.utc) |
|
# print("{}>={} Diff: {}".format(email_time,start_time,(email_time-start_time))) |
|
if email_time >= start_time: |
|
# New email with specified subject found |
|
print("Mail OK") |
|
return True |
|
|
|
print("Mail NOK") |
|
return False |
|
|
|
|
|
def get_emails(args): |
|
"""Get the list of email addresses to send the notification to.""" |
|
if args.emails: |
|
return args.emails.split(",") |
|
else: |
|
return [args.user] |
|
|
|
|
|
def send_notification(emails, smtp_host, smtp_port, smtp_user, smtp_password, subject): |
|
"""Send a notification email to the specified email addresses.""" |
|
message = """\ |
|
Subject: [EMAIL_ALERT] Erreur lors de la surveilance du serveur mail |
|
From: {1} |
|
To: {2} |
|
Content-Type: text/plain; charset="utf-8" |
|
|
|
Le serveur mail est peut-être en défaut. |
|
|
|
Sujet manquant: '{0}'""".format( |
|
subject, smtp_user, ", ".join(emails) |
|
) |
|
print(message) |
|
context = ssl.create_default_context() |
|
with smtplib.SMTP_SSL(smtp_host, smtp_port, context=context) as server: |
|
server.login(smtp_user, smtp_password) |
|
server.sendmail(smtp_user, emails, message.encode('utf-8')) |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description="Monitor an email account for new messages matching specified criteria." |
|
) |
|
parser.add_argument("--imap-host", required=True, help="IMAP server host name") |
|
parser.add_argument( |
|
"--imap-port", default=993, type=int, help="IMAP server port number" |
|
) |
|
parser.add_argument("--smtp-host", help="SMTP server host name") |
|
parser.add_argument( |
|
"--smtp-port", default=465, type=int, help="SMTP server port number" |
|
) |
|
parser.add_argument("--user", required=True, help="Email account user name") |
|
parser.add_argument("--password", required=True, help="Email account password") |
|
parser.add_argument( |
|
"--subject", required=True, help="Subject line of the email to monitor" |
|
) |
|
parser.add_argument( |
|
"--timeout", |
|
required=True, |
|
type=int, |
|
help="Time period in minutes to search for emails", |
|
) |
|
parser.add_argument( |
|
"--emails", |
|
help="Comma-separated list of email addresses to send the notification to", |
|
) |
|
args = parser.parse_args() |
|
|
|
# Check email and send notification if needed |
|
if not check_email( |
|
args.imap_host, |
|
args.imap_port, |
|
args.user, |
|
args.password, |
|
args.subject, |
|
args.timeout, |
|
): |
|
smtp_host = args.smtp_host or args.imap_host |
|
send_notification( |
|
get_emails(args), |
|
smtp_host, |
|
args.smtp_port, |
|
args.user, |
|
args.password, |
|
args.subject, |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |