Skip to content

Instantly share code, notes, and snippets.

@mdeweerd
Last active September 21, 2023 21:34
Show Gist options
  • Save mdeweerd/f5395c25b7394822f098084ad7cdbbe8 to your computer and use it in GitHub Desktop.
Save mdeweerd/f5395c25b7394822f098084ad7cdbbe8 to your computer and use it in GitHub Desktop.
Monitor Email Service by Checking Incoming on IMAP Server

Purpose

This script is targeted to be part of a mechanism to monitor the proper operation of a system sending emails.

It checks that the IMAP Server received a message with a certain SUBJECT in the last TIMEOUT minutes.
If such a message is missing during the TIMEOUT time frame, it will send an email notification.

It therefore expects that some other application sends messages with the given SUBJECT with a periodicity that is slightly less than TIMEOUT minutes.

It can help detect the following:

  • That a mail server is down or limited (quota reached),
  • That an application can no longer send emails (credentials, server access);
  • That an application no longer sends functional alerts (by simulating alert generating event sources on virtual inputs or devices).

Use

Add monitor_email.py to your system.

Add a file to /etc/cron.d/ as show below. There are of course other methods to execute the script.

SHELL=/bin/sh
SCRIPT_ROOT=/path/to/scripts/
LOG_DIR=/path/to/logs
MONITOR_SUBJECT="Monitor Test Mail"
MAIL_SERVER="imap.example.com"
MAIL_USER="[email protected]"
MAIL_PASSWORD="Password"
NOTIFICATION_MAILS="[email protected]"

# m h dom mon dow       user    command
# `sendmonitoremail` is not provided, this is a script that sends or triggers the sending of the monitoring message.
# It could be configured & sent from your application server that would be different from the monitoring server.
*/15       *   * * *    some_user  ${SCRIPT_ROOT}/sendmonitoremail $MAIL_USER "$MONITOR_SUBJECT" > $LOG_DIR/sendMonitorMail.log 2>&1
5,20,35,50       *   * * *      some_user  ${SCRIPT_ROOT}/monitor_email.py --imap-host $MAIL_SERVER --timeout 20 --user $MAIL_USER --password "$MAIL_PASSWORD" --emails "$NOTIFICATION_MAILS" --subject "$MONITOR_SUBJECT" > $LOG_DIR/monitorMail.log 2>&1

Note

This is a quick and not so dirty script which can be improved upon, but does the job.

#!/usr/bin/env python3
# License:
# Free to modify and reuse at your own risks.
# Keep link to original location: https://gist.github.com/f5395c25b7394822f098084ad7cdbbe8
import argparse
import email
import imaplib
import smtplib
import ssl
import time
from datetime import datetime, timedelta, timezone
from email.header import decode_header
from time import mktime
def decode_subject(encoded_subject):
"""Decode the subject of an email message."""
if isinstance(encoded_subject, bytes):
encoded_subject = encoded_subject.decode("utf-8")
decoded_parts = decode_header(encoded_subject)
decoded_subject = ""
for part in decoded_parts:
if isinstance(part[0], bytes):
decoded_subject += part[0].decode(part[1] or "utf-8")
else:
decoded_subject += part[0]
return decoded_subject
def check_email(imap_host, imap_port, imap_user, imap_password, subject, timeout):
"""Check the email server for new messages matching the specified subject and received within the specified time period."""
context = ssl.create_default_context()
with imaplib.IMAP4_SSL(imap_host, imap_port) as imap_server:
imap_server.login(imap_user, imap_password)
imap_server.select()
start_time = datetime.now(timezone.utc) - timedelta(minutes=timeout)
start_time_str = start_time.strftime("%d-%b-%Y")
escaped_subject = subject.replace('"', '\\"')
search_string = '(SUBJECT "{}" SINCE "{}")'.format(
escaped_subject, start_time_str
)
# print(search_string)
status, email_ids = imap_server.search(None, search_string)
if status != "OK":
return False
# print(email_ids)
email_ids = email_ids[0].split()
# Sort in reverse numerical order - biggest is likely most recent
email_ids = sorted(email_ids, key=lambda x: int(x), reverse=True)
for email_id in email_ids:
result, email_data = imap_server.fetch(email_id, "(INTERNALDATE)")
if result == "OK" and email_data[0]:
t_struct = imaplib.Internaldate2tuple(email_data[0])
email_time = datetime.fromtimestamp(mktime(t_struct), timezone.utc)
# print("{}>={} Diff: {}".format(email_time,start_time,(email_time-start_time)))
if email_time >= start_time:
# New email with specified subject found
print("Mail OK")
return True
print("Mail NOK")
return False
def get_emails(args):
"""Get the list of email addresses to send the notification to."""
if args.emails:
return args.emails.split(",")
else:
return [args.user]
def send_notification(emails, smtp_host, smtp_port, smtp_user, smtp_password, subject):
"""Send a notification email to the specified email addresses."""
message = """\
Subject: [EMAIL_ALERT] Erreur lors de la surveilance du serveur mail
From: {1}
To: {2}
Content-Type: text/plain; charset="utf-8"
Le serveur mail est peut-être en défaut.
Sujet manquant: '{0}'""".format(
subject, smtp_user, ", ".join(emails)
)
print(message)
context = ssl.create_default_context()
with smtplib.SMTP_SSL(smtp_host, smtp_port, context=context) as server:
server.login(smtp_user, smtp_password)
server.sendmail(smtp_user, emails, message.encode('utf-8'))
def main():
parser = argparse.ArgumentParser(
description="Monitor an email account for new messages matching specified criteria."
)
parser.add_argument("--imap-host", required=True, help="IMAP server host name")
parser.add_argument(
"--imap-port", default=993, type=int, help="IMAP server port number"
)
parser.add_argument("--smtp-host", help="SMTP server host name")
parser.add_argument(
"--smtp-port", default=465, type=int, help="SMTP server port number"
)
parser.add_argument("--user", required=True, help="Email account user name")
parser.add_argument("--password", required=True, help="Email account password")
parser.add_argument(
"--subject", required=True, help="Subject line of the email to monitor"
)
parser.add_argument(
"--timeout",
required=True,
type=int,
help="Time period in minutes to search for emails",
)
parser.add_argument(
"--emails",
help="Comma-separated list of email addresses to send the notification to",
)
args = parser.parse_args()
# Check email and send notification if needed
if not check_email(
args.imap_host,
args.imap_port,
args.user,
args.password,
args.subject,
args.timeout,
):
smtp_host = args.smtp_host or args.imap_host
send_notification(
get_emails(args),
smtp_host,
args.smtp_port,
args.user,
args.password,
args.subject,
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment