Skip to content

Instantly share code, notes, and snippets.

@holys
Created February 18, 2014 13:55
Show Gist options
  • Save holys/f26629c64ac6b962f3bc to your computer and use it in GitHub Desktop.
Save holys/f26629c64ac6b962f3bc to your computer and use it in GitHub Desktop.
#coding: utf-8
"""Pylogstat
Usage:
pylogstat.py parse FILE [--db DB] [--host HOST] [--port PORT] [--maxline MAXLINE]
pylogstat.py aggregate (--db DB) [--host HOST] [--port PORT] [--start
START] [--end END] [--delta DELTA]
pylogstat.py index (-c | -d) (--db DB) [--host HOST] [--port PORT]
Options:
-h --help Show this screen.
-v --version Show version.
-c Create index(es).
-d Drop index(es).
--db=DB Specify the database.
--host=HOST Specify the server host.
--port=PORT Specify the server port.
--maxline=MAXLINE Specify line to parse at one shot.
--start=START Specify the start datetime to query.
--end=END Specify the end datetime to query.
--delta=DELTA Specify the datetime interval to query.
"""
__author__ = 'chendahui'
__author_email__ = '[email protected]'
from datetime import datetime, timedelta, date
from itertools import islice
from hashlib import md5
from docopt import docopt
import logging
import os
import re
import sys
import fcntl
import struct
import socket
import time
#from docopt import docopt
import bson
try:
from pymongo import MongoClient as Connection
except ImportError:
from pymongo import Connection as Connection
def get_logger(level=logging.WARNING, logpath='pylogstat.log'):
logger = logging.getLogger('pylogstat')
logger.setLevel(level)
# file handler
handler = logging.FileHandler(logpath)
handler.setLevel(logging.WARNING)
# console handler
console = logging.StreamHandler()
console.setLevel(logging.ERROR)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s -\
%(message)s')
handler.setFormatter(formatter)
console.setFormatter(formatter)
logger.addHandler(handler)
logger.addHandler(console)
return logger
logger = get_logger()
def get_today():
return datetime.combine(date.today(), datetime.min.time())
def get_tomorror():
delta = timedelta(days=1)
return datetime.combine(get_today() + delta, datetime.min.time())
def get_db(host=None, port=None):
"""Get db object"""
#FIXME: how to deal with authentication?
conn = Connection(host, port)
return conn
def get_lan_ip():
"""ifname is something like eth0, em1, etc"""
def get_ip(ifname):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s',
ifname[:15]))[20:24])
ip = socket.gethostbyname(socket.gethostname())
if ip.startswith('127.') and os.name != 'nt':
interfaces = ['em0', 'em1', 'eth0', 'eth1']
for ifname in interfaces:
try:
ip = get_ip(ifname)
except IOError:
pass
return ip
class Parser():
"""Parse log file and insert them to mongodb"""
def __init__(self, db, logpath, loghost):
"""
:param logpath: the log file path
:param loghost: the host where log file exists
:param log: the collection stores the parsed logs
:param seek: the collection records the last position
:param key: string that identify the log file
"""
self.db = db
self.logpath = logpath
self.loghost = loghost
self.log = self.db['log']
self.seek = self.db['seek']
self.key = '%s/%s' % (self.loghost, self.logpath)
def read_seek(self, seek_coll):
"""Read the previous position from mongodb"""
query = seek_coll.find_one({'file': self.key},
{'seek':1, '_id': 0})
if query:
pos = int(query.get('seek'))
else:
pos = 0
return pos
def write_seek(self, pos):
"""Save the current position to mongodb for first time update"""
last_saved_time = datetime.now().isoformat()
record = {
'_id': bson.ObjectId(),
'file': self.key,
'seek': pos,
'last': datetime.now().isoformat()
}
result = self.seek.insert(record)
return result
def update_seek(self, pos):
"""Update the seek position"""
last_saved_time = datetime.now().isoformat()
update = {
'seek': pos,
'last': datetime.now().isoformat(),
}
result = self.seek.update({'file': self.key},
{'$set': update})
return result
def process(self, fp, pos, maxline=2000):
"""Read N lines at a time"""
fp.seek(pos, 1)
while True:
next_n_lines = list(islice(fp, maxline))
logger.info('Current position is %s', fp.tell())
if not next_n_lines:
logger.warning('END OF FILE')
break
events = []
for line in next_n_lines:
event = self.filter(line)
if event:
events.append(event)
self.log.insert(events)
if pos:
#update
self.update_seek(fp.tell())
else:
#insert
pos = fp.tell()
self.write_seek(fp.tell())
#else:
# #FIXME: better one?
# logger.warning('Event is invalid')
return True
def filter(self, log):
"""Using re to parse nginx access.log"""
# nginx access log re pattern
ngx = (r'((?P<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|\-) - ((.+?)|(\-)) '
'\[(?P<datetime>\d{2}\/[a-z]{3}\/\d{4}:\d{2}:\d{2}:\d{2} '
'(\+|\-)\d{4})\] ((\"(?P<method>\w+?) )(?P<url>.+?) (http.+?")|(\".+?")) '
'(?P<status>\d{3}) (?P<bytes>\d+) (["](?P<referer>(\-)|(.+?))["]) '
'(["](?P<ua>(\-)|(.+?))["]) (["](?P<forward>(\-)|(.+?))["]) '
'(?P<rq_time>(\-)|(.+?)) (?P<rsp_time>(\-$)|(.+?$))')
#FIXME: 还有个别奇葩的log没兼容, 考虑都是超级少量的, 先不考虑。
nginx_pattern = re.compile(ngx, re.IGNORECASE)
result = re.search(nginx_pattern, log)
logger.info('The log is %s', log)
if result:
event = result.groupdict()
# pop item with non-sense value for saving space
for k, v in event.items():
if v is '-' or v == 0 or v == '0.000':
event.pop(k)
event['_id'] = bson.ObjectId()
event['datetime'] = datetime.strptime(event['datetime'],
'%d/%b/%Y:%H:%M:%S +0800')
event['status'] = int(event['status'])
if event.get('bytes'):
event['bytes'] = int(event.get('bytes'))
if event.get('rq_time'):
event['rq_time'] = float(event.get('rq_time'))
if event.get('rsp_time'):
try:
event['rsp_time'] = float(event.get('rsp_time'))
except ValueError:
logger.warning('WARNING - Log format is invalid => %s', log)
# if start with '/api' , add 'api' field.
try:
if event.get('url').startswith('/api'):
# qm => question mark
qm = re.compile('\?.*')
word = re.compile('\w{32}')
digit = re.compile('\d{1,11}')
api = event['url']
api = qm.sub('', api)
# don't change the order
api = word.sub('[:name]', api)
api = digit.sub('[:id]', api)
event['api'] = api.strip()
except AttributeError:
logger.warning('WARNING BAD_URL %s', log)
else:
#FIXME: better one?
logger.warning('WARNING - Not captured: %s', log)
event = None
logger.info(event)
return event
class Aggregation(object):
"""Aggregation from mongodb"""
# API relevant.
def __init__(self, conn, log_db):
self.conn = conn
self.log = self.conn[log_db]['log']
self.stat = self.conn['stat']
def api_map_count(self, start, end, size=None):
"""Aggregation of API
:param coll, the collection Object you want to query;
:param limit, integer, default to None. limit the quantity of output.
"""
if not size:
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end}, 'api':
{'$exists': True}}},
{'$group': {'_id': '$api', 'count': {'$sum': 1}}},
{'$sort': {'count':-1}}
])
else:
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end}, 'api': {'$exists': True}}},
{'$group': {'_id': '$api', 'count': {'$sum': 1}}},
{'$sort': {'count':-1}},
{'$limit': int(size)}
])
return result
def api_total_count(self, start, end):
"""Count url with pattern '/^\/api/'"""
return self.log.find({'datetime': {'$gt': start, '$lte': end},
'api': {'$exists': True}}).count()
def status_map_count(self, start, end):
"""Status code map count, just API"""
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end}, 'api': {'$exists': True}}},
{'$group': {'_id': '$status', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}}
])
return result
def response_time_map_count(self, start, end, size=100):
"""Return an cursor object, just API"""
result = self.log.find(
{'datetime': {'$gt': start, '$lte': end}, 'api': {'$exists': True}}
).sort('rsp_time', -1).limit(int(size))
return {'result': [i for i in result]}
def ip_map_count_api(self, start, end, size=None):
"""IP select group by api"""
if not size:
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end}, 'api': {'$exists': True}}},
{'$group': {'_id': '$ip', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}},
])
else:
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end}, 'api': {'$exists': True}}},
{'$group': {'_id': '$ip', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}},
{'$limit': int(size)}
])
return result
# IP relevant
def ip_map_count(self, start, end, size=None):
"""Top IP sorted by requests(all url)"""
if not size:
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end}}},
{'$group': {'_id': '$ip', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}}
])
else:
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end}}},
{'$group': {'_id': '$ip', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}},
{'$limit': int(size)}
])
return result
def ip_map_count_non_api(self, start, end, size=None):
"""Top IP sorted by non-api url"""
if not size:
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end},
'api': {'$exists': False}}},
{'$group': {'_id': '$ip', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}}
])
else:
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end},
'api': {'$exists': False}}},
{'$group': {'_id': '$ip', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}},
{'$limit': int(size)}
])
return result
# UserAgent relevant
def ua_map_count(self, start, end, size=None):
if not size:
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end}}},
{'$group': {'_id': '$ua', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}},
])
else:
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end}}},
{'$group': {'_id': '$ua', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}},
{'$limit': int(size)}
])
return result
# Referer relevant
def referer_map_count(self, start, end, size=None):
if not size:
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end}}},
{'$group': {'_id': '$referer', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}},
])
else:
result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end}}},
{'$group': {'_id': '$referer', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}},
{'$limit': int(size)}
])
return result
# Bandwidth
def bytes_count(self, start, end):
_result = self.log.aggregate([
{'$match': {'datetime': {'$gt': start, '$lte': end}}},
{'$group': {'_id': None, 'total': {'$sum': '$bytes'}}}
])
if _result.get('result'):
result = float(_result.get('result')[0].get('total'))
else:
result = None
return result
def aggregate(self, start=get_today(),
delta=timedelta(days=1),
end=get_tomorror()):
"""Query from mongodb, and save results
:param start: datetime.datetime object
:param delta: datetime.timedelta object
:param end: datatetime.datetime object
e.g.:
start = datetime.datetime(2014, 2, 18)
delta = datetime.timedelta(hours=1)
"""
# stat is name of db that store the aggregation results.
stat = self.conn['stat']
# <, NOT <=
while start < end:
begin = time.time()
logger.info('time between %s and %s',
(start.isoformat(), (start + delta).isoformat())
)
apis = {
'time': start,
'api_map_count': self.api_map_count(start, start + delta),
'api_total_count': self.api_total_count(start, start + delta),
'status_map_count': self.status_map_count(start, start + delta),
'response_time_map_count': self.response_time_map_count(start,
start + delta),
'ip_map_count_api': self.ip_map_count_api(start, start + delta, size=100)
}
others = {
'time': start,
'ip_map_count': self.ip_map_count(start, start + delta, size=100),
'ip_map_count_non_api': self.ip_map_count_non_api(start, start + delta,
size=100),
'ua_map_count': self.ua_map_count(start, start + delta, size=100),
'referer_map_count': self.referer_map_count(start, start +
delta, size=100),
'bytes_count': self.bytes_count(start, start + delta)
}
stat['apis'].insert(apis)
stat['others'].insert(others)
start += delta
logging.info('Consumed time: %s', (time.time() - begin))
logger.info('Query finished.')
class Index(object):
"""Mongodb index operaions"""
def __init__(self, db):
self.db = db
self.log = self.db['log']
def create(self, datetime_only=False):
start = time.time()
self.log.ensure_index([('datetime', 1)])
logger.info('Created index {datetime: 1} consumed time: %s',
time.time() - start)
if not datetime_only:
start = time.time()
self.log.ensure_index([('datetime', 1), ('api', 1)])
logger.info('Created index {datetime: 1, api: 1}: consumed time: %s',
time.time() - start)
start = time.time()
self.log.ensure_index([('datetime', 1), ('status', 1)])
logger.info('Created index {datetime: 1, status: 1} consumed time: %s ',
time.time() - start)
start = time.time()
self.log.ensure_index([('datetime', 1), ('rsp_time', 1)])
logger.info('Created index {datetime: 1, rsp_time: 1} consumed time: %s ',
time.time() - start)
start = time.time()
self.log.ensure_index([('datetime', 1), ('ip', 1)])
logger.info('Created index {datetime: 1, ip: 1} consumed time: %s',
time.time() - start)
start = time.time()
self.log.ensure_index([('datetime', 1), ('ua', 1)])
logger.info('Created index {datetime: 1, ua: 1} consumed time: %s',
time.time() - start)
start = time.time()
log.ensure_index([('datetime', 1), ('referer', 1)])
logger.info('Created index {datetime: 1, referer: 1} consumed time:',
time.time() - start)
else:
return
def drop(self):
logging.waring('This operation will drop all the indexes of current'
'collection.')
self.log.drop_indexes()
def main():
args = docopt(__doc__, version='Pylogstat 0.1')
loghost = get_lan_ip()
if len(sys.argv) > 1:
command = sys.argv[1]
host = args.get('--host') or 'localhost'
if args.get('--port'):
port = int(args.get('--port'))
else:
port = None
conn = get_db(host=host, port=port)
db = conn[args.get('--db')]
maxline = args.get('--maxline') or 2000
args_create = args.get('-c')
args_drop = args.get('-d')
if args.get('FILE'):
logpath = os.path.abspath(args.get('file'))
if command == 'parse':
parser = Parser(db, logpath, loghost)
try:
pos = parser.read_seek(db['seek'])
fp = open(logpath, 'r')
parser.process(fp, pos, maxline)
fp.close()
except IOError:
logger.error("logfile not exists.")
elif command == 'aggregate':
ag = Aggregation(conn, args.get('--db'))
ag.aggregate(start=start, end=end)
elif command == 'index':
index = Index(db)
if args_create:
index.create(datetime_only=True)
elif args_drop:
index.drop()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment