-
-
Save darkseed/1001425 to your computer and use it in GitHub Desktop.
Chatterbox - System monitoring through social networking
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Chatterbox - System monitoring through Social Networking. | |
Chatterbox uses Twitter, IM and eventually other "social" channels to | |
send you information about your systems, so that you can keep in touch | |
with how they're performing without lots of messages in your e-mail inbox | |
or on your pager. | |
Currently, it only looks for interesting conditions in Squid logs. It's also not really... done. | |
""" | |
__author__ = "Mark Nottingham <[email protected]>" | |
__copyright__ = """\ | |
Copyright (c) 2010 Mark Nottingham | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in | |
all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
THE SOFTWARE. | |
""" | |
import ConfigParser | |
import cPickle as pickle | |
import fcntl | |
import gzip | |
import logging | |
import os | |
import signal | |
import sys | |
import time | |
from collections import defaultdict | |
from functools import partial | |
from logging import StreamHandler | |
from urlparse import urlsplit | |
# try to get the epollreactor | |
try: | |
from twisted.internet import epollreactor | |
epollreactor.install() | |
except ImportError: | |
pass | |
from twisted.internet import reactor, stdio | |
from twisted.protocols.basic import LineReceiver | |
from twisted.internet.protocol import connectionDone | |
from twisted.words.protocols.jabber import client, jid, xmlstream | |
from twisted.words.xish import domish | |
from twisted.python import log as twisted_log | |
from twittytwister import twitter # http://github.com/dustin/twitty-twister | |
# requires http://github.com/leah/python-oauth | |
############################################################################ | |
class Chatterbox: | |
"The Chatterbox coordinator." | |
def __init__(self, re_actor, config, log): | |
self.reactor = re_actor | |
self.config = config | |
self.log = log | |
self.state = AttrDict() # things that get persisted between invocations | |
self.shutting_down = False | |
def start(self): | |
"Start chatterbox." | |
self.log.info("start") | |
self.monitors = [SquidLogWatcher(self)] | |
self.channels = Channels(TweetBucket(self), JabberClient(self)) | |
# temp | |
self.load_state() # call AFTER anything that might set default in state | |
self.channels.online(True) | |
self.reactor.run(installSignalHandlers=False) | |
def shutdown(self): | |
"Stop chatterbox." | |
# TODO: is shutting_down necessary? Check twisted semantics... | |
if self.shutting_down or not self.reactor.running: | |
return | |
self.shutting_down = True | |
self.log.info("stop") | |
self.channels.online(False) | |
self.save_state() | |
# FIXME: yes, this is a race condition. | |
self.reactor.callLater(3, self.reactor.stop) | |
def load_state(self): | |
"Load any persisted state." | |
dbfile = self.config.get("main", "statefile") | |
try: | |
db_lock = open(dbfile, 'a') | |
fcntl.flock(db_lock, fcntl.LOCK_EX | fcntl.LOCK_NB) | |
except IOError, why: | |
# can't get a lock | |
self.log.info("State file is locked; retrying read...") | |
self.reactor.callLater(1, self.load_state) | |
return | |
start_time = time.time() | |
try: | |
db = gzip.open(dbfile, 'rb') | |
# taste the file to see if it's compressed | |
try: | |
db.read(1) | |
db.seek(0) | |
except (IOError, EOFError): | |
# open non-gzipped file | |
db = open(dbfile, 'rb') | |
try: | |
state = pickle.load(db) | |
except (ValueError, pickle.PickleError), why: | |
self.log.error("State is corrupt! (%s)" % why) | |
return | |
db.close() | |
except (IOError, EOFError), why: | |
self.log.warning("State read problem. (%s)" % why) | |
return | |
finally: | |
db_lock.close() | |
self.state = state | |
self.log.info("State loaded in %3.3f seconds." % (time.time() - start_time)) | |
def save_state(self): | |
"Persist state." | |
dbfile = self.config.get("main", "statefile") | |
try: | |
db_lock = open(dbfile, 'a') | |
fcntl.flock(db_lock, fcntl.LOCK_EX | fcntl.LOCK_NB) | |
except IOError, why: | |
# can't get a lock | |
self.log.info("State file is locked; retrying write...") | |
self.reactor.callLater(1, self.save_state) | |
return | |
start_time = time.time() | |
try: | |
if self.config.getboolean('main', 'use-gzip'): | |
db = gzip.open(dbfile, 'wb', 6) | |
else: | |
db = open(dbfile, 'wb') | |
try: | |
pickle.dump(self.state, db, pickle.HIGHEST_PROTOCOL) | |
except pickle.PickleError, why: | |
self.log.error("Can't write state! (%s)" % why) | |
db.close() | |
except IOError, why: | |
self.log.warning("Problems writing state! (%s)" % why) | |
finally: | |
db_lock.close() | |
self.log.info("State saved in %3.3f seconds." % (time.time() - start_time)) | |
def load_config_list(self, name, section, key): | |
value = self.config.get(section, key).strip().split(None) | |
self.log.info("Loaded %s %s: %s" % (section, name, ", ".join(value))) | |
return value | |
############################################################################ | |
class Channels: | |
""" | |
Holder for open channels. | |
""" | |
def __init__(self, *channels): | |
self.channels = channels | |
def __getattr__(self, attr): | |
def apply_to_channels(*args, **kw): | |
for channel in self.channels: | |
getattr(channel, "channel_" + attr, self.noop)(*args, **kw) | |
return apply_to_channels | |
def noop(self, *args, **kw): | |
sys.stderr.write("Chatterbox - noop!\n") | |
class Channel: | |
""" | |
Base class for channels. | |
""" | |
def channel_online(self, active): | |
"Notify of startup and shutdown." | |
pass | |
def channel_say(self, msg): | |
"Tell people about something." | |
pass | |
def channel_alert(self, msg): | |
"Alert admin to something bad." | |
pass | |
class JabberClient(Channel): | |
""" | |
Jabber client; not working very well at the moment, so disabled. | |
""" | |
def __init__(self, mgr): | |
self.mgr = mgr | |
self.xmlstream = None | |
username = mgr.config.get("jabber", "username").strip() | |
password = mgr.config.get("jabber", "password").strip() | |
server = mgr.config.get("jabber", "server").strip() | |
self.admins = self.mgr.load_config_list("admins", "jabber", "admins") | |
self.online_admins = set() | |
self.me = jid.JID(username + "/Chatterbox") | |
self.factory = client.basicClientFactory(self.me, password) | |
self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.authd) | |
self.factory.addBootstrap(client.BasicAuthenticator.AUTH_FAILED_EVENT, self.authfailedEvent) | |
self.mgr.reactor.connectTCP(server, 5222, self.factory) | |
mgr.log.debug("Jabber client started.") | |
def authd(self, xmlstream): | |
self.mgr.log.info("Jabber authenticated.") | |
self.xmlstream = xmlstream | |
presence = domish.Element(('jabber:client', 'presence')) | |
presence.addElement('status').addContent('Online') | |
xmlstream.send(presence) | |
xmlstream.addObserver('/message', self.gotMessage) | |
xmlstream.addObserver('/presence', self.gotPresence) | |
self.sendKeepAlive() | |
def authfailedEvent(self, xmlstream): | |
self.mgr.log.error("Jabber authentication failed.") | |
def connectionLost(self, reason): | |
self.mgr.log.info("Jabber disconnected.") | |
### TODO: reconnect | |
def gotMessage(self, message): | |
body = None | |
for e in message.elements(): | |
if e.name == "body": | |
body = unicode(e.__str__()).strip().lower() | |
self.mgr.log.debug("Jabber receive: %s" % body) | |
def sendMessage(self, to, msg): | |
if self.xmlstream: | |
message = domish.Element(('jabber:client','message')) | |
message["to"] = jid.JID(to).full() | |
message["from"] = self.me.full() | |
message["type"] = "chat" | |
message.addElement("body", "jabber:client", msg) | |
self.xmlstream.send(message) | |
else: | |
self.mgr.reactor.callLater(3, self.sendMessage, to, msg) | |
def gotPresence(self, element): | |
full_addr = element['from'] | |
addr = full_addr.split("/", 1)[0] | |
if addr in self.admins: | |
self.mgr.log.debug("Jabber admin %s online." % addr) | |
self.online_admins.add(full_addr) | |
self.sendMessage(full_addr, 'hi.') | |
def sendKeepAlive(self): | |
"Nudge the connection." | |
self.xmlstream.send(" ") | |
self.mgr.reactor.callLater(30, self.sendKeepAlive) | |
def channel_online(self, active): | |
if active: | |
self.channel_say("Hi!") | |
else: | |
self.channel_say("Bye for now.") | |
def channel_say(self, msg): | |
for admin in self.online_admins: | |
self.sendMessage(admin, msg) | |
class TweetBucket(Channel, twitter.Twitter): | |
""" | |
Twitter client. | |
""" | |
def __init__(self, mgr): | |
self.mgr = mgr | |
username = mgr.config.get("twitter", "username").strip() | |
password = mgr.config.get("twitter", "password").strip() | |
twitter.Twitter.__init__(self, username, password) | |
self.admins = self.mgr.load_config_list("admins", "twitter", "admins") | |
mgr.log.debug("Twitter client started.") | |
def channel_say(self, msg): | |
"Tweet a message." | |
d = self.update(msg) | |
d.addCallback(self.good_post) | |
d.addErrback(self.bad_post) | |
# def channel_online(self, active): | |
# if not active: | |
# self.channel_say("Going down...") | |
def channel_alert(self, msg): | |
self.channel_say("%s %s, can you look into it?" % (msg, " ".join(self.admins))) | |
def good_post(self, post_id): | |
"Tweet has succeeded." | |
self.mgr.log.info("Posted tweet #%s." % post_id) | |
def bad_post(self, problem): | |
"Problem with the tweet." | |
self.mgr.log.error("Problem posting tweet: %s" % problem) | |
############################################################################ | |
class SquidLogWatcher(LineReceiver): | |
"Handles the Squid log deamon protocol in STDIN." | |
delimiter = '\n' | |
def __init__(self, mgr): | |
self.mgr = mgr | |
self.squid_stats = SquidStats(mgr) | |
stdio.StandardIO(self) | |
mgr.log.debug("Squid log listener started.") | |
def lineReceived(self, line): | |
""" | |
Process a log line. | |
""" | |
try: | |
code = line[0] | |
except: | |
return | |
if code is 'L': # Log | |
line = line[1:].rstrip() | |
self.mgr.log.debug("log: %s" % line) | |
try: | |
self.squid_stats.add(line) | |
except ValueError: | |
self.mgr.log.error("Misformatted Squid log line received!") | |
elif code is 'R': # Rotate | |
self.mgr.log.info("Rotating logs...") | |
for hdlr in self.log.handlers: | |
try: | |
hdlr.doRollover() | |
except AttributeError: | |
pass | |
else: | |
pass | |
def connectionLost(self, reason=connectionDone): | |
self.mgr.shutdown() | |
class SquidStats: | |
""" | |
Analyses incoming Squid log lines, keeps stats on them and | |
alerts the user when interesting things happen. | |
Squid config: | |
logfile_daemon /path/to/chatterbox.py | |
logformat chatterbox %ts %tr %>a %Ss %Hs %<st %rm %ru %mt %{Referer}>h | |
access_log daemon:/path/to/config.txt chatterbox | |
""" | |
def __init__(self, mgr, max_buckets=120, bucket_width=30): | |
self.mgr = mgr | |
self.state = mgr.state | |
self.max_buckets = max_buckets # how many buckets to keep | |
self.bucket_width = bucket_width # how many seconds wide a bucket is | |
self.view_span = "%s minutes" % int(max_buckets * bucket_width / 60.0) | |
self.buckets = [{}] | |
self.state.seen_referers=set() | |
self.site_names = self.mgr.load_config_list("site names", "main", "site-names") | |
self.ignore_sites = self.mgr.load_config_list("sites to ignore", "main", "ignore-sites") | |
self.mgr.reactor.callLater(bucket_width, self.gc) | |
mgr.log.debug("Squid stats monitor installed.") | |
def add(self, line): | |
"""Add a log line to the stats collection. """ | |
field_names = ['ts', 'latency', 'client_ip', 'squid_code', 'status', | |
'bytes', 'method', 'url', 'media_type', 'referer'] | |
entry = dict(zip(field_names, line.split(None, 9))) | |
self.buckets[-1]['hits'] = self.buckets[-1].get('hits', 0) + 1 | |
self.examine(entry) | |
def examine(self, entry): | |
""" | |
Examine a new log entry for interesting things; usually, to extract | |
the bits we need to check later. | |
Will call each exampine_* method with the entry as its argument, expecting | |
it to return data to stash away in the current bucket. | |
""" | |
for name, method in self.get_methods('examine_'): | |
result = method(entry) | |
if result: | |
if not self.buckets[-1].has_key(name): | |
self.buckets[-1][name] = [] | |
self.buckets[-1][name].append(result) | |
def check(self): | |
""" | |
Check the stats collection for any interesting conditions. | |
Will call each check_* method with all buckted entries. | |
""" | |
for name, method in self.get_methods('check_'): | |
entries = sum([b.get(name, []) for b in self.buckets], []) | |
method(entry) | |
def examine_referers(self, entry): | |
"Look for new, high-traffic referers." | |
if entry['referer'] in ['-', '']: return | |
if entry['status'] in ['403', '404', '410', '301', '302', '303', '307']: return | |
referer_site = urlsplit(entry['referer']).hostname | |
if (referer_site in self.site_names) or (referer_site in self.ignore_sites): return | |
referer_pair = (entry['referer'], entry['url']) | |
# TODO: canonicalise | |
return referer_pair | |
def check_referers(self, stashed_entries): | |
for referer_pair, count in stashed_entries: | |
if count > 2: | |
self.mgr.channels.say("Hey, %s is giving %s some love." % referer_pair) | |
# if referer_pair not in self.state.seen_referers: | |
# self.state.seen_referers.add(referer_pair) | |
def check_status(self, stashed_entries): | |
"Look for unusual status codes." | |
server_err_urls = [e['url'] for e in entries if e['status'] in ['500']] | |
if len(entries) > 12 and len(entries) * .25 > len(server_err_urls): | |
self.mgr.channels.say("Uh, oh; %s of the last %s hits were 5xx errors!" % ( | |
len(server_err_urls), len(entries))) | |
else: | |
for url, count in self.count(server_err_urls): | |
if count > 2: | |
self.mgr.channels.say("%s seems to be getting some 5xx errors recently." % url) | |
def gc(self): | |
"""Garbage collect the stats collection. """ | |
# Has it been quiet? | |
if len(self.buckets) >= self.max_buckets and not(sum([b.get('hits', 0) for b in self.buckets])): | |
self.mgr.channels.alert( | |
"Hmm, not seeing any traffic in a while.") | |
self.buckets.append({}) | |
if len(self.buckets) > self.max_buckets: | |
self.buckets.pop(0) | |
self.mgr.reactor.callLater(self.bucket_width, self.gc) | |
def get_methods(self, prefix): | |
return [(k.split(prefix, 1)[1], partial(v, self)) for (k,v) in \ | |
self.__class__.__dict__.items() if k.startswith(prefix)] | |
@staticmethod | |
def count(lyst): | |
"Given a list of items, return a list of (item, count)" | |
c = defaultdict(int) | |
for i in lyst: | |
c[i] += 1 | |
return c.items() | |
############################################################################## | |
def main(configfile): | |
# load config | |
try: | |
config = ConfigParser.SafeConfigParser( | |
{ | |
'log-level': 'INFO', | |
'use-gzip': 'True', | |
'site-names': '', | |
'ignore-sites': '', | |
'admins': '', | |
} | |
) | |
config.add_section('main') | |
config.read(configfile) | |
log_level = config.get("main", "log-level").strip().upper() | |
except ConfigParser.Error, why: | |
error("Configuration file: %s\n" % why) | |
# start logging | |
log = logging.getLogger() | |
try: | |
hdlr = StreamHandler(sys.stderr) | |
except IOError, why: | |
error("Can't open log file (%s)" % why) | |
formatter = logging.Formatter('%(asctime)s| Chatterbox %(levelname)s: %(message)s', '%Y/%m/%d %H:%M:%S') | |
hdlr.setFormatter(formatter) | |
log.addHandler(hdlr) | |
log.setLevel(logging._levelNames.get(log_level, logging.INFO)) | |
observer = twisted_log.PythonLoggingObserver() | |
observer.start() | |
# run | |
try: | |
mgr = Chatterbox(reactor, config, log) | |
# we ignore SIGTERM because Squid will close the log FH, which gives | |
# us a much cleaner signal that we're to shut down. | |
signal.signal(signal.SIGTERM, signal.SIG_IGN) | |
mgr.start() | |
except ConfigParser.Error, why: | |
error("Configuration file: %s" % why) | |
except Exception, why: | |
if log_level != 'DEBUG': | |
error(why) | |
else: | |
raise | |
except: | |
if log_level != 'DEBUG': | |
error("Unknown error.") | |
else: | |
raise | |
# clean up logging | |
hdlr.flush() | |
hdlr.close() | |
logging.shutdown() | |
def error(msg): | |
"Something really bad has happened. Should only be used during startup." | |
sys.stderr.write("CHAT FATAL: %s\n" % msg) | |
sys.exit(1) | |
class AttrDict(dict): | |
def __init__(self, *args, **kwargs): | |
dict.__init__(self, *args, **kwargs) | |
self.__dict__ = self | |
if __name__ == '__main__': | |
try: | |
conf = sys.argv[1] | |
except IndexError: | |
sys.stderr.write("USAGE: %s config_filename\n" % sys.argv[0]) | |
sys.exit(1) | |
if not os.path.exists(conf): | |
error("Can't find config file %s." % conf) | |
main(conf) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[main] | |
log-level = DEBUG | |
statefile = /tmp/foo | |
site-names = www.example.com | |
ignore-sites = www.google.com www.google.it www.google.de www.google.com.au www.google.fr | |
www.google.co.in www.google.com.tr www.google.at www.google.be www.google.ae www.google.ca | |
www.google.ch www.google.co www.google.cz www.google.dk www.google.ie www.google.nl | |
www.google.pl www.google.pt | |
bing.com bing.com.au bing.co.uk | |
translate.googleusercontent.com webcache.googleusercontent.com | |
bit.ly stumbleupon.com del.icio.us delicious.com | |
validator.w3.org | |
[twitter] | |
username = user | |
password = pass | |
admins = @example |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment