Created
February 18, 2014 13:55
-
-
Save holys/f26629c64ac6b962f3bc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding: utf-8 | |
"""Pylogstat | |
Usage: | |
pylogstat.py parse FILE [--db DB] [--host HOST] [--port PORT] [--maxline MAXLINE] | |
pylogstat.py aggregate (--db DB) [--host HOST] [--port PORT] [--start | |
START] [--end END] [--delta DELTA] | |
pylogstat.py index (-c | -d) (--db DB) [--host HOST] [--port PORT] | |
Options: | |
-h --help Show this screen. | |
-v --version Show version. | |
-c Create index(es). | |
-d Drop index(es). | |
--db=DB Specify the database. | |
--host=HOST Specify the server host. | |
--port=PORT Specify the server port. | |
--maxline=MAXLINE Specify line to parse at one shot. | |
--start=START Specify the start datetime to query. | |
--end=END Specify the end datetime to query. | |
--delta=DELTA Specify the datetime interval to query. | |
""" | |
__author__ = 'chendahui' | |
__author_email__ = '[email protected]' | |
from datetime import datetime, timedelta, date | |
from itertools import islice | |
from hashlib import md5 | |
from docopt import docopt | |
import logging | |
import os | |
import re | |
import sys | |
import fcntl | |
import struct | |
import socket | |
import time | |
#from docopt import docopt | |
import bson | |
try: | |
from pymongo import MongoClient as Connection | |
except ImportError: | |
from pymongo import Connection as Connection | |
def get_logger(level=logging.WARNING, logpath='pylogstat.log'): | |
logger = logging.getLogger('pylogstat') | |
logger.setLevel(level) | |
# file handler | |
handler = logging.FileHandler(logpath) | |
handler.setLevel(logging.WARNING) | |
# console handler | |
console = logging.StreamHandler() | |
console.setLevel(logging.ERROR) | |
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s -\ | |
%(message)s') | |
handler.setFormatter(formatter) | |
console.setFormatter(formatter) | |
logger.addHandler(handler) | |
logger.addHandler(console) | |
return logger | |
logger = get_logger() | |
def get_today(): | |
return datetime.combine(date.today(), datetime.min.time()) | |
def get_tomorror(): | |
delta = timedelta(days=1) | |
return datetime.combine(get_today() + delta, datetime.min.time()) | |
def get_db(host=None, port=None): | |
"""Get db object""" | |
#FIXME: how to deal with authentication? | |
conn = Connection(host, port) | |
return conn | |
def get_lan_ip(): | |
"""ifname is something like eth0, em1, etc""" | |
def get_ip(ifname): | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
return socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', | |
ifname[:15]))[20:24]) | |
ip = socket.gethostbyname(socket.gethostname()) | |
if ip.startswith('127.') and os.name != 'nt': | |
interfaces = ['em0', 'em1', 'eth0', 'eth1'] | |
for ifname in interfaces: | |
try: | |
ip = get_ip(ifname) | |
except IOError: | |
pass | |
return ip | |
class Parser(): | |
"""Parse log file and insert them to mongodb""" | |
def __init__(self, db, logpath, loghost): | |
""" | |
:param logpath: the log file path | |
:param loghost: the host where log file exists | |
:param log: the collection stores the parsed logs | |
:param seek: the collection records the last position | |
:param key: string that identify the log file | |
""" | |
self.db = db | |
self.logpath = logpath | |
self.loghost = loghost | |
self.log = self.db['log'] | |
self.seek = self.db['seek'] | |
self.key = '%s/%s' % (self.loghost, self.logpath) | |
def read_seek(self, seek_coll): | |
"""Read the previous position from mongodb""" | |
query = seek_coll.find_one({'file': self.key}, | |
{'seek':1, '_id': 0}) | |
if query: | |
pos = int(query.get('seek')) | |
else: | |
pos = 0 | |
return pos | |
def write_seek(self, pos): | |
"""Save the current position to mongodb for first time update""" | |
last_saved_time = datetime.now().isoformat() | |
record = { | |
'_id': bson.ObjectId(), | |
'file': self.key, | |
'seek': pos, | |
'last': datetime.now().isoformat() | |
} | |
result = self.seek.insert(record) | |
return result | |
def update_seek(self, pos): | |
"""Update the seek position""" | |
last_saved_time = datetime.now().isoformat() | |
update = { | |
'seek': pos, | |
'last': datetime.now().isoformat(), | |
} | |
result = self.seek.update({'file': self.key}, | |
{'$set': update}) | |
return result | |
def process(self, fp, pos, maxline=2000): | |
"""Read N lines at a time""" | |
fp.seek(pos, 1) | |
while True: | |
next_n_lines = list(islice(fp, maxline)) | |
logger.info('Current position is %s', fp.tell()) | |
if not next_n_lines: | |
logger.warning('END OF FILE') | |
break | |
events = [] | |
for line in next_n_lines: | |
event = self.filter(line) | |
if event: | |
events.append(event) | |
self.log.insert(events) | |
if pos: | |
#update | |
self.update_seek(fp.tell()) | |
else: | |
#insert | |
pos = fp.tell() | |
self.write_seek(fp.tell()) | |
#else: | |
# #FIXME: better one? | |
# logger.warning('Event is invalid') | |
return True | |
def filter(self, log): | |
"""Using re to parse nginx access.log""" | |
# nginx access log re pattern | |
ngx = (r'((?P<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|\-) - ((.+?)|(\-)) ' | |
'\[(?P<datetime>\d{2}\/[a-z]{3}\/\d{4}:\d{2}:\d{2}:\d{2} ' | |
'(\+|\-)\d{4})\] ((\"(?P<method>\w+?) )(?P<url>.+?) (http.+?")|(\".+?")) ' | |
'(?P<status>\d{3}) (?P<bytes>\d+) (["](?P<referer>(\-)|(.+?))["]) ' | |
'(["](?P<ua>(\-)|(.+?))["]) (["](?P<forward>(\-)|(.+?))["]) ' | |
'(?P<rq_time>(\-)|(.+?)) (?P<rsp_time>(\-$)|(.+?$))') | |
#FIXME: 还有个别奇葩的log没兼容, 考虑都是超级少量的, 先不考虑。 | |
nginx_pattern = re.compile(ngx, re.IGNORECASE) | |
result = re.search(nginx_pattern, log) | |
logger.info('The log is %s', log) | |
if result: | |
event = result.groupdict() | |
# pop item with non-sense value for saving space | |
for k, v in event.items(): | |
if v is '-' or v == 0 or v == '0.000': | |
event.pop(k) | |
event['_id'] = bson.ObjectId() | |
event['datetime'] = datetime.strptime(event['datetime'], | |
'%d/%b/%Y:%H:%M:%S +0800') | |
event['status'] = int(event['status']) | |
if event.get('bytes'): | |
event['bytes'] = int(event.get('bytes')) | |
if event.get('rq_time'): | |
event['rq_time'] = float(event.get('rq_time')) | |
if event.get('rsp_time'): | |
try: | |
event['rsp_time'] = float(event.get('rsp_time')) | |
except ValueError: | |
logger.warning('WARNING - Log format is invalid => %s', log) | |
# if start with '/api' , add 'api' field. | |
try: | |
if event.get('url').startswith('/api'): | |
# qm => question mark | |
qm = re.compile('\?.*') | |
word = re.compile('\w{32}') | |
digit = re.compile('\d{1,11}') | |
api = event['url'] | |
api = qm.sub('', api) | |
# don't change the order | |
api = word.sub('[:name]', api) | |
api = digit.sub('[:id]', api) | |
event['api'] = api.strip() | |
except AttributeError: | |
logger.warning('WARNING BAD_URL %s', log) | |
else: | |
#FIXME: better one? | |
logger.warning('WARNING - Not captured: %s', log) | |
event = None | |
logger.info(event) | |
return event | |
class Aggregation(object): | |
"""Aggregation from mongodb""" | |
# API relevant. | |
def __init__(self, conn, log_db): | |
self.conn = conn | |
self.log = self.conn[log_db]['log'] | |
self.stat = self.conn['stat'] | |
def api_map_count(self, start, end, size=None): | |
"""Aggregation of API | |
:param coll, the collection Object you want to query; | |
:param limit, integer, default to None. limit the quantity of output. | |
""" | |
if not size: | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}, 'api': | |
{'$exists': True}}}, | |
{'$group': {'_id': '$api', 'count': {'$sum': 1}}}, | |
{'$sort': {'count':-1}} | |
]) | |
else: | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}, 'api': {'$exists': True}}}, | |
{'$group': {'_id': '$api', 'count': {'$sum': 1}}}, | |
{'$sort': {'count':-1}}, | |
{'$limit': int(size)} | |
]) | |
return result | |
def api_total_count(self, start, end): | |
"""Count url with pattern '/^\/api/'""" | |
return self.log.find({'datetime': {'$gt': start, '$lte': end}, | |
'api': {'$exists': True}}).count() | |
def status_map_count(self, start, end): | |
"""Status code map count, just API""" | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}, 'api': {'$exists': True}}}, | |
{'$group': {'_id': '$status', 'count': {'$sum': 1}}}, | |
{'$sort': {'count': -1}} | |
]) | |
return result | |
def response_time_map_count(self, start, end, size=100): | |
"""Return an cursor object, just API""" | |
result = self.log.find( | |
{'datetime': {'$gt': start, '$lte': end}, 'api': {'$exists': True}} | |
).sort('rsp_time', -1).limit(int(size)) | |
return {'result': [i for i in result]} | |
def ip_map_count_api(self, start, end, size=None): | |
"""IP select group by api""" | |
if not size: | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}, 'api': {'$exists': True}}}, | |
{'$group': {'_id': '$ip', 'count': {'$sum': 1}}}, | |
{'$sort': {'count': -1}}, | |
]) | |
else: | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}, 'api': {'$exists': True}}}, | |
{'$group': {'_id': '$ip', 'count': {'$sum': 1}}}, | |
{'$sort': {'count': -1}}, | |
{'$limit': int(size)} | |
]) | |
return result | |
# IP relevant | |
def ip_map_count(self, start, end, size=None): | |
"""Top IP sorted by requests(all url)""" | |
if not size: | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}}}, | |
{'$group': {'_id': '$ip', 'count': {'$sum': 1}}}, | |
{'$sort': {'count': -1}} | |
]) | |
else: | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}}}, | |
{'$group': {'_id': '$ip', 'count': {'$sum': 1}}}, | |
{'$sort': {'count': -1}}, | |
{'$limit': int(size)} | |
]) | |
return result | |
def ip_map_count_non_api(self, start, end, size=None): | |
"""Top IP sorted by non-api url""" | |
if not size: | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}, | |
'api': {'$exists': False}}}, | |
{'$group': {'_id': '$ip', 'count': {'$sum': 1}}}, | |
{'$sort': {'count': -1}} | |
]) | |
else: | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}, | |
'api': {'$exists': False}}}, | |
{'$group': {'_id': '$ip', 'count': {'$sum': 1}}}, | |
{'$sort': {'count': -1}}, | |
{'$limit': int(size)} | |
]) | |
return result | |
# UserAgent relevant | |
def ua_map_count(self, start, end, size=None): | |
if not size: | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}}}, | |
{'$group': {'_id': '$ua', 'count': {'$sum': 1}}}, | |
{'$sort': {'count': -1}}, | |
]) | |
else: | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}}}, | |
{'$group': {'_id': '$ua', 'count': {'$sum': 1}}}, | |
{'$sort': {'count': -1}}, | |
{'$limit': int(size)} | |
]) | |
return result | |
# Referer relevant | |
def referer_map_count(self, start, end, size=None): | |
if not size: | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}}}, | |
{'$group': {'_id': '$referer', 'count': {'$sum': 1}}}, | |
{'$sort': {'count': -1}}, | |
]) | |
else: | |
result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}}}, | |
{'$group': {'_id': '$referer', 'count': {'$sum': 1}}}, | |
{'$sort': {'count': -1}}, | |
{'$limit': int(size)} | |
]) | |
return result | |
# Bandwidth | |
def bytes_count(self, start, end): | |
_result = self.log.aggregate([ | |
{'$match': {'datetime': {'$gt': start, '$lte': end}}}, | |
{'$group': {'_id': None, 'total': {'$sum': '$bytes'}}} | |
]) | |
if _result.get('result'): | |
result = float(_result.get('result')[0].get('total')) | |
else: | |
result = None | |
return result | |
def aggregate(self, start=get_today(), | |
delta=timedelta(days=1), | |
end=get_tomorror()): | |
"""Query from mongodb, and save results | |
:param start: datetime.datetime object | |
:param delta: datetime.timedelta object | |
:param end: datatetime.datetime object | |
e.g.: | |
start = datetime.datetime(2014, 2, 18) | |
delta = datetime.timedelta(hours=1) | |
""" | |
# stat is name of db that store the aggregation results. | |
stat = self.conn['stat'] | |
# <, NOT <= | |
while start < end: | |
begin = time.time() | |
logger.info('time between %s and %s', | |
(start.isoformat(), (start + delta).isoformat()) | |
) | |
apis = { | |
'time': start, | |
'api_map_count': self.api_map_count(start, start + delta), | |
'api_total_count': self.api_total_count(start, start + delta), | |
'status_map_count': self.status_map_count(start, start + delta), | |
'response_time_map_count': self.response_time_map_count(start, | |
start + delta), | |
'ip_map_count_api': self.ip_map_count_api(start, start + delta, size=100) | |
} | |
others = { | |
'time': start, | |
'ip_map_count': self.ip_map_count(start, start + delta, size=100), | |
'ip_map_count_non_api': self.ip_map_count_non_api(start, start + delta, | |
size=100), | |
'ua_map_count': self.ua_map_count(start, start + delta, size=100), | |
'referer_map_count': self.referer_map_count(start, start + | |
delta, size=100), | |
'bytes_count': self.bytes_count(start, start + delta) | |
} | |
stat['apis'].insert(apis) | |
stat['others'].insert(others) | |
start += delta | |
logging.info('Consumed time: %s', (time.time() - begin)) | |
logger.info('Query finished.') | |
class Index(object): | |
"""Mongodb index operaions""" | |
def __init__(self, db): | |
self.db = db | |
self.log = self.db['log'] | |
def create(self, datetime_only=False): | |
start = time.time() | |
self.log.ensure_index([('datetime', 1)]) | |
logger.info('Created index {datetime: 1} consumed time: %s', | |
time.time() - start) | |
if not datetime_only: | |
start = time.time() | |
self.log.ensure_index([('datetime', 1), ('api', 1)]) | |
logger.info('Created index {datetime: 1, api: 1}: consumed time: %s', | |
time.time() - start) | |
start = time.time() | |
self.log.ensure_index([('datetime', 1), ('status', 1)]) | |
logger.info('Created index {datetime: 1, status: 1} consumed time: %s ', | |
time.time() - start) | |
start = time.time() | |
self.log.ensure_index([('datetime', 1), ('rsp_time', 1)]) | |
logger.info('Created index {datetime: 1, rsp_time: 1} consumed time: %s ', | |
time.time() - start) | |
start = time.time() | |
self.log.ensure_index([('datetime', 1), ('ip', 1)]) | |
logger.info('Created index {datetime: 1, ip: 1} consumed time: %s', | |
time.time() - start) | |
start = time.time() | |
self.log.ensure_index([('datetime', 1), ('ua', 1)]) | |
logger.info('Created index {datetime: 1, ua: 1} consumed time: %s', | |
time.time() - start) | |
start = time.time() | |
log.ensure_index([('datetime', 1), ('referer', 1)]) | |
logger.info('Created index {datetime: 1, referer: 1} consumed time:', | |
time.time() - start) | |
else: | |
return | |
def drop(self): | |
logging.waring('This operation will drop all the indexes of current' | |
'collection.') | |
self.log.drop_indexes() | |
def main(): | |
args = docopt(__doc__, version='Pylogstat 0.1') | |
loghost = get_lan_ip() | |
if len(sys.argv) > 1: | |
command = sys.argv[1] | |
host = args.get('--host') or 'localhost' | |
if args.get('--port'): | |
port = int(args.get('--port')) | |
else: | |
port = None | |
conn = get_db(host=host, port=port) | |
db = conn[args.get('--db')] | |
maxline = args.get('--maxline') or 2000 | |
args_create = args.get('-c') | |
args_drop = args.get('-d') | |
if args.get('FILE'): | |
logpath = os.path.abspath(args.get('file')) | |
if command == 'parse': | |
parser = Parser(db, logpath, loghost) | |
try: | |
pos = parser.read_seek(db['seek']) | |
fp = open(logpath, 'r') | |
parser.process(fp, pos, maxline) | |
fp.close() | |
except IOError: | |
logger.error("logfile not exists.") | |
elif command == 'aggregate': | |
ag = Aggregation(conn, args.get('--db')) | |
ag.aggregate(start=start, end=end) | |
elif command == 'index': | |
index = Index(db) | |
if args_create: | |
index.create(datetime_only=True) | |
elif args_drop: | |
index.drop() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment