Last active
December 15, 2015 16:29
-
-
Save isdyy/5289518 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.7 | |
# -*- coding: utf-8 -*- | |
# vim:et ts=4 ff=unix: | |
"""Simple/handy website downtime detection (using curl and sendmail)""" | |
import argparse | |
import datetime | |
import email | |
import email.utils | |
import json | |
import logging | |
import os | |
import re | |
import shlex | |
import string | |
import subprocess | |
import sys | |
import tempfile | |
logger = logging.getLogger(__name__) | |
_DEFAULT_TEMPLATE = """ | |
Subject: [$servicename] $label$subject | |
$subject | |
URL: $url | |
$reason | |
""".strip() | |
class LastState(object): | |
def __init__(self, filename): | |
self.filename = filename | |
self._content = self._load(filename) | |
def is_success(self): | |
# defaults True for first time check | |
return True if self._content.get('status', True) else False | |
def save(self, state, **kwds): | |
kwds['status'] = bool(state) | |
with open(self.filename, "w") as fh: | |
fh.write(json.dumps(kwds)) | |
self._content = kwds | |
def _load(self, filename): | |
try: | |
with open(filename, "r") as fh: | |
return json.loads(fh.read()) | |
except (IOError, ValueError): | |
return {} | |
class TemplateMessage(object): | |
def __init__(self, text, sender=None, receivers=None, defaults=None): | |
self.sender = sender | |
self.receivers = receivers or [] | |
self._template = text | |
self._defaults = defaults or {} | |
def render(self, **kwds): | |
params = self._defaults.copy() | |
params.update(kwds) | |
header, body = self._template.split('\n\n', 1) | |
message = email.message_from_string( | |
string.Template( | |
'\n'.join( | |
[line for line in header.split('\n') | |
if not line.startswith('#')] | |
+ ['', body])).safe_substitute(params)) | |
if self.receivers: | |
for addr in self.receivers: | |
message.add_header('To', addr) | |
sender = self.sender | |
if not sender: | |
if message.get_unixfrom(): | |
sender = message.get_unixfrom().split(' ').pop() | |
elif message.get('from'): | |
sender = email.utils.parseaddr(message.get('from'))[1] | |
else: | |
sender = None | |
return sender, message | |
def send_mail(args, subject, **kwds): | |
template = TemplateMessage( | |
args.template.read() if args.template else _DEFAULT_TEMPLATE, | |
sender=args.sender, | |
receivers=args.rcpts, | |
defaults=dict( | |
servicename=args.servicename, | |
subject=subject, | |
url=args.url, | |
reason='', | |
label='')) | |
sender, msg = template.render(**kwds) | |
command = [args.sendmail, "-t", "-oi"] | |
if sender: | |
command += ["-f", sender] | |
p1 = subprocess.Popen(["printf", msg.as_string()], stdout=subprocess.PIPE) | |
p2 = subprocess.Popen(command, stdin=p1.stdout) | |
p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exists. | |
out, err = p2.communicate() | |
if p2.returncode and err: | |
raise RuntimeError(err) | |
def on_error(args, error): | |
now = datetime.datetime.now().replace(microsecond=0) | |
send_mail(args, 'Service may be down.', | |
reason='Reason: %s\n' % error['out'], label='ERROR: ') | |
last = LastState(args.statefile) | |
last.save(False, time=now.isoformat()) | |
logger.warn('\t'.join(('NG', args.url, error['out']))) | |
def main(argv=None): | |
parser = argparse.ArgumentParser( | |
description="Simple service downtime detection") | |
parser.add_argument( | |
"-n", "--name", dest="servicename", | |
help="Name of the service " | |
"(used as a subject prefix of notification emails)") | |
parser.add_argument( | |
"-r", "--rcpts", dest="rcpts", action="append", metavar="ADDR", | |
help="Email address(es) to send notification to") | |
parser.add_argument( | |
"-c", "--command", dest="sendmail", metavar="PATH", | |
help="Path to the sendmail command", default="/usr/sbin/sendmail") | |
parser.add_argument( | |
"-f", "--sender", dest="sender", metavar="ADDR", | |
help="Set the envelope sender address.") | |
parser.add_argument( | |
"-t", "--template", dest="template", type=argparse.FileType("r"), | |
help="Path to the notification email template file." | |
"If `-` given, read stdin.") | |
parser.add_argument( | |
"-s", "--state", dest="statefile", | |
help="Path to the state file") | |
parser.add_argument("url") | |
args = parser.parse_args(argv) | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s') | |
if not args.servicename: | |
args.servicename = re.sub( | |
r'^https?://([-a-zA-Z0-9.]+)/.*', '\\1', args.url) | |
if not args.statefile: | |
ident = re.sub(r'^https?://', '', args.url) | |
ident = re.sub(r'[^-\w]+', '-', ident) | |
args.statefile = os.path.join( | |
tempfile.gettempdir(), # Or more appropriate path? | |
'{name}.{ident}.state'.format(name=logger.name, ident=ident)) | |
try: | |
cmd = shlex.split("curl -fs --show-error --retry 4 --retry-delay 1") | |
try: | |
# For debug, specify e.g. http://httpstat.us/500 | |
subprocess.check_output(cmd + [args.url], stderr=subprocess.STDOUT) | |
except subprocess.CalledProcessError as e: | |
on_error(args, dict(url=args.url, out=e.output.strip())) | |
else: | |
on_success(args) | |
except Exception as exc: | |
logger.exception('%s: %s', exc.__class__.__name__, exc) | |
sys.exit(255) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From: [email protected] | |
To: [email protected] | |
Subject: [$servicename] $label$subject | |
$subject | |
URL: $url | |
$reason | |
Detail: http://status.example.com |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment