Last active
August 29, 2015 14:10
-
-
Save cdunklau/858d7c27d5d9ba7086ac to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Find failed login attempts in auth log, save results in a sqlite database, | |
and print a report. | |
""" | |
from __future__ import print_function | |
import os | |
import re | |
import sqlite3 | |
import itertools | |
import argparse | |
import textwrap | |
from datetime import datetime | |
demo = ( | |
'Nov 25 13:41:24 davinci sshd[11388]: Failed password for root from ' | |
'111.74.239.143 port 4293 ssh2' | |
) | |
LOGIN_FAIL_PARTS = [ | |
r'Failed password for(?: invalid user)?', | |
r'(?P<user>\S+)', | |
r'from', | |
r'(?P<source>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', | |
r'port', | |
r'(?P<sourceport>\d+)', | |
r'(?P<protocol>\S+)' | |
] | |
LOGIN_FAIL_REGEX = re.compile(' '.join(LOGIN_FAIL_PARTS)) | |
assert LOGIN_FAIL_REGEX.search(demo) | |
DATETIME_FORMAT = '%b %d %H:%M:%S' | |
METADATA_PARTS = [ | |
r'(?P<timestamp>\w{3} \d\d \d\d:\d\d:\d\d)', | |
r'(?P<thishost>\S+)', | |
r'(?P<procname>\w+)\[(?P<pid>\d+)\]', | |
] | |
METADATA_REGEX = re.compile(' '.join(METADATA_PARTS)) | |
assert METADATA_REGEX.match(demo) | |
assert datetime.strptime(METADATA_REGEX.match(demo).group('timestamp'), | |
DATETIME_FORMAT) | |
COLUMNS = [ | |
('timestamp', 'timestamp'), | |
('thishost', 'text'), | |
('procname', 'text'), | |
('pid', 'integer'), | |
('user', 'text'), | |
('source', 'text'), | |
('sourceport', 'text'), | |
('protocol', 'text'), | |
] | |
SCHEMA = "CREATE TABLE failures ({0})".format( | |
', '.join('{0} {1}'.format(*item) for item in COLUMNS) | |
) | |
class Application(object): | |
db = None | |
def __init__(self, database_filename, auth_log_filename, report_only): | |
self.database_filename = database_filename | |
self.auth_log_filename = auth_log_filename | |
self.auth_log_year = os | |
log_mtime = os.stat(self.auth_log_filename).st_mtime | |
self.auth_log_year = datetime.fromtimestamp(log_mtime).year | |
self.report_only = report_only | |
def run(self): | |
self.setup_database() | |
if not self.report_only: | |
print('Finding login failures...') | |
with open(self.auth_log_filename) as f: | |
failures = self.get_failures(f) | |
self.store_failures(failures) | |
print('Collecting summary...') | |
summary_rows = list(self.get_summary()) | |
summary_fmt = ( | |
'I found {attempts} login failures from {source}:\n' | |
' they began on {earliest} and ended on {latest}\n' | |
' and tried usernames {users}' | |
) | |
for row in summary_rows: | |
print(summary_fmt.format(**row)) | |
def setup_database(self): | |
print('Opening database file...') | |
self.db = sqlite3.connect(self.database_filename) | |
self.db.row_factory = sqlite3.Row | |
if not self.report_only: | |
print('Clearing database...') | |
self.db.execute('DROP TABLE IF EXISTS failures') | |
self.db.execute(SCHEMA) | |
def get_failures(self, lines): | |
for line in lines: | |
if 'Failed password' not in line: | |
continue | |
m = LOGIN_FAIL_REGEX.search(line) | |
if not m: | |
raise Exception( | |
'No login fail match in line {0!r}'.format(line)) | |
info = m.groupdict() | |
m = METADATA_REGEX.match(line) | |
if not m: | |
raise Exception('No metadata match in line {0!r}'.format(line)) | |
info.update(m.groupdict()) | |
for intkey in ('sourceport', 'pid'): | |
info[intkey] = int(info[intkey]) | |
when = datetime.strptime(info['timestamp'], DATETIME_FORMAT) | |
# Logs don't have a year | |
info['timestamp'] = when.replace(year=self.auth_log_year) | |
yield info | |
def store_failures(self, failures): | |
cursor = self.db.cursor() | |
cnames = ', '.join(name for name, _ in COLUMNS) | |
cplaces = ', '.join(':' + name for name, _ in COLUMNS) | |
sql = 'INSERT INTO failures ({0}) VALUES ({1})'.format(cnames, cplaces) | |
grouper = [iter(failures)] * 100 | |
chunker = itertools.izip(*grouper) | |
for chunk in chunker: | |
with self.db: | |
cursor.executemany(sql, chunk) | |
def get_summary(self): | |
""" | |
Get the source, count of attempts, earliest occurance, | |
latest occurance, and list of usernames attempted. | |
""" | |
sql = """ | |
SELECT | |
source, | |
count(*) AS attempts, | |
min(timestamp) as earliest, | |
max(timestamp) as latest, | |
group_concat(DISTINCT user) as users | |
FROM failures GROUP BY source ORDER BY attempts DESC""" | |
return self.db.execute(sql) | |
def main(): | |
parser = argparse.ArgumentParser(description=textwrap.dedent(__doc__)) | |
parser.add_argument('dbfile', help='Database output filename') | |
parser.add_argument( | |
'--authlog', default='/var/log/auth.log', | |
help='Path to auth log to parse') | |
parser.add_argument( | |
'--report-only', action='store_true', | |
help="Don't parse the log or recreate the database, just use " | |
"the existing data") | |
args = parser.parse_args() | |
app = Application(args.dbfile, args.authlog, args.report_only) | |
app.run() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment