Skip to content

Instantly share code, notes, and snippets.

@dstufft
Created June 3, 2011 12:24
Show Gist options
  • Save dstufft/1006261 to your computer and use it in GitHub Desktop.
Save dstufft/1006261 to your computer and use it in GitHub Desktop.
Reads a tab delimited log file in reverse and generates some stats based on it.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Finds the Address with the most Hit's yesterday"""
from datetime import date, datetime
from optparse import OptionParser
import mmap
import csv
import sys
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
class ReverseFileReader(object):
"""An object that read's a file object in reverse."""
def __init__(self, file):
self.map = mmap.mmap(file.fileno(), 0, prot=mmap.PROT_READ)
self.end = len(self.map)
def readlines(self):
while self.end != -1:
start = self.map.rfind('\n', 0, self.end)
yield self.map[start+1:self.end]
self.end = start
class LogParser(ReverseFileReader):
"""A Log Parser object that subclasses ReverseFileReader"""
def __init__(self, *args, **kwargs):
super(LogParser, self).__init__(*args, **kwargs)
self.hits = {}
def parse_log(self, start=datetime.min, end=datetime.max):
"""Parse log file, storing the amount of times each IP address visits."""
for line in self.readlines():
parsed = csv.reader([line], dialect='excel-tab')
line = [x for x in parsed][0]
dtime = datetime.strptime(line[0], DATETIME_FORMAT)
if start <= dtime <= end:
self.hits[line[2]] = self.hits.get(line[2], 0) + 1
return self.hits
def yesterdays_log(self):
"""Helper function to get parse yesterday's logs"""
now = datetime.now()
yesterday_start = datetime(now.year, now.month, now.day-1, 0, 0, 0)
yesterday_end = datetime(now.year, now.month, now.day-1, 23, 59, 59)
return self.parse_log(start=yesterday_start, end=yesterday_end)
if __name__ == "__main__":
usage = "Usage: %prog [options] logfile"
parser = OptionParser(usage=usage)
(options, args) = parser.parse_args()
if len(args) != 1:
sys.exit('Please specify one logfile.')
lp = LogParser(open(args[0]))
highest_ips = []
highest_hits = 0
for ip, hits in lp.yesterdays_log().iteritems():
if hits > highest_hits:
highest_ips = [ip]
highest_hits = hits
elif hits == highest_hits:
highest_ips.append(ip)
if len(highest_ips):
print '%d hits Yesterday by the following ip(s):' % highest_hits
for ip in highest_ips:
print ' %s' % ip
else:
print 'There we no hits yesterday'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment