Skip to content

Instantly share code, notes, and snippets.

@isdyy
Last active December 15, 2015 16:29
Show Gist options
  • Save isdyy/5289518 to your computer and use it in GitHub Desktop.
Save isdyy/5289518 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2.7
# -*- coding: utf-8 -*-
# vim:et ts=4 ff=unix:
"""Simple/handy website downtime detection (using curl and sendmail)"""
import argparse
import datetime
import email
import email.utils
import json
import logging
import os
import re
import shlex
import string
import subprocess
import sys
import tempfile
logger = logging.getLogger(__name__)
_DEFAULT_TEMPLATE = """
Subject: [$servicename] $label$subject
$subject
URL: $url
$reason
""".strip()
class LastState(object):
def __init__(self, filename):
self.filename = filename
self._content = self._load(filename)
def is_success(self):
# defaults True for first time check
return True if self._content.get('status', True) else False
def save(self, state, **kwds):
kwds['status'] = bool(state)
with open(self.filename, "w") as fh:
fh.write(json.dumps(kwds))
self._content = kwds
def _load(self, filename):
try:
with open(filename, "r") as fh:
return json.loads(fh.read())
except (IOError, ValueError):
return {}
class TemplateMessage(object):
def __init__(self, text, sender=None, receivers=None, defaults=None):
self.sender = sender
self.receivers = receivers or []
self._template = text
self._defaults = defaults or {}
def render(self, **kwds):
params = self._defaults.copy()
params.update(kwds)
header, body = self._template.split('\n\n', 1)
message = email.message_from_string(
string.Template(
'\n'.join(
[line for line in header.split('\n')
if not line.startswith('#')]
+ ['', body])).safe_substitute(params))
if self.receivers:
for addr in self.receivers:
message.add_header('To', addr)
sender = self.sender
if not sender:
if message.get_unixfrom():
sender = message.get_unixfrom().split(' ').pop()
elif message.get('from'):
sender = email.utils.parseaddr(message.get('from'))[1]
else:
sender = None
return sender, message
def send_mail(args, subject, **kwds):
template = TemplateMessage(
args.template.read() if args.template else _DEFAULT_TEMPLATE,
sender=args.sender,
receivers=args.rcpts,
defaults=dict(
servicename=args.servicename,
subject=subject,
url=args.url,
reason='',
label=''))
sender, msg = template.render(**kwds)
command = [args.sendmail, "-t", "-oi"]
if sender:
command += ["-f", sender]
p1 = subprocess.Popen(["printf", msg.as_string()], stdout=subprocess.PIPE)
p2 = subprocess.Popen(command, stdin=p1.stdout)
p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exists.
out, err = p2.communicate()
if p2.returncode and err:
raise RuntimeError(err)
def on_error(args, error):
now = datetime.datetime.now().replace(microsecond=0)
send_mail(args, 'Service may be down.',
reason='Reason: %s\n' % error['out'], label='ERROR: ')
last = LastState(args.statefile)
last.save(False, time=now.isoformat())
logger.warn('\t'.join(('NG', args.url, error['out'])))
def main(argv=None):
parser = argparse.ArgumentParser(
description="Simple service downtime detection")
parser.add_argument(
"-n", "--name", dest="servicename",
help="Name of the service "
"(used as a subject prefix of notification emails)")
parser.add_argument(
"-r", "--rcpts", dest="rcpts", action="append", metavar="ADDR",
help="Email address(es) to send notification to")
parser.add_argument(
"-c", "--command", dest="sendmail", metavar="PATH",
help="Path to the sendmail command", default="/usr/sbin/sendmail")
parser.add_argument(
"-f", "--sender", dest="sender", metavar="ADDR",
help="Set the envelope sender address.")
parser.add_argument(
"-t", "--template", dest="template", type=argparse.FileType("r"),
help="Path to the notification email template file."
"If `-` given, read stdin.")
parser.add_argument(
"-s", "--state", dest="statefile",
help="Path to the state file")
parser.add_argument("url")
args = parser.parse_args(argv)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
if not args.servicename:
args.servicename = re.sub(
r'^https?://([-a-zA-Z0-9.]+)/.*', '\\1', args.url)
if not args.statefile:
ident = re.sub(r'^https?://', '', args.url)
ident = re.sub(r'[^-\w]+', '-', ident)
args.statefile = os.path.join(
tempfile.gettempdir(), # Or more appropriate path?
'{name}.{ident}.state'.format(name=logger.name, ident=ident))
try:
cmd = shlex.split("curl -fs --show-error --retry 4 --retry-delay 1")
try:
# For debug, specify e.g. http://httpstat.us/500
subprocess.check_output(cmd + [args.url], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
on_error(args, dict(url=args.url, out=e.output.strip()))
else:
on_success(args)
except Exception as exc:
logger.exception('%s: %s', exc.__class__.__name__, exc)
sys.exit(255)
if __name__ == '__main__':
main()
From: [email protected]
To: [email protected]
Subject: [$servicename] $label$subject
$subject
URL: $url
$reason
Detail: http://status.example.com
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment