Skip to content

Instantly share code, notes, and snippets.

@cdunklau
Last active August 29, 2015 14:10
Show Gist options
  • Save cdunklau/858d7c27d5d9ba7086ac to your computer and use it in GitHub Desktop.
Save cdunklau/858d7c27d5d9ba7086ac to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
Find failed login attempts in auth log, save results in a sqlite database,
and print a report.
"""
from __future__ import print_function
import os
import re
import sqlite3
import itertools
import argparse
import textwrap
from datetime import datetime
demo = (
'Nov 25 13:41:24 davinci sshd[11388]: Failed password for root from '
'111.74.239.143 port 4293 ssh2'
)
LOGIN_FAIL_PARTS = [
r'Failed password for(?: invalid user)?',
r'(?P<user>\S+)',
r'from',
r'(?P<source>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
r'port',
r'(?P<sourceport>\d+)',
r'(?P<protocol>\S+)'
]
LOGIN_FAIL_REGEX = re.compile(' '.join(LOGIN_FAIL_PARTS))
assert LOGIN_FAIL_REGEX.search(demo)
DATETIME_FORMAT = '%b %d %H:%M:%S'
METADATA_PARTS = [
r'(?P<timestamp>\w{3} \d\d \d\d:\d\d:\d\d)',
r'(?P<thishost>\S+)',
r'(?P<procname>\w+)\[(?P<pid>\d+)\]',
]
METADATA_REGEX = re.compile(' '.join(METADATA_PARTS))
assert METADATA_REGEX.match(demo)
assert datetime.strptime(METADATA_REGEX.match(demo).group('timestamp'),
DATETIME_FORMAT)
COLUMNS = [
('timestamp', 'timestamp'),
('thishost', 'text'),
('procname', 'text'),
('pid', 'integer'),
('user', 'text'),
('source', 'text'),
('sourceport', 'text'),
('protocol', 'text'),
]
SCHEMA = "CREATE TABLE failures ({0})".format(
', '.join('{0} {1}'.format(*item) for item in COLUMNS)
)
class Application(object):
db = None
def __init__(self, database_filename, auth_log_filename, report_only):
self.database_filename = database_filename
self.auth_log_filename = auth_log_filename
self.auth_log_year = os
log_mtime = os.stat(self.auth_log_filename).st_mtime
self.auth_log_year = datetime.fromtimestamp(log_mtime).year
self.report_only = report_only
def run(self):
self.setup_database()
if not self.report_only:
print('Finding login failures...')
with open(self.auth_log_filename) as f:
failures = self.get_failures(f)
self.store_failures(failures)
print('Collecting summary...')
summary_rows = list(self.get_summary())
summary_fmt = (
'I found {attempts} login failures from {source}:\n'
' they began on {earliest} and ended on {latest}\n'
' and tried usernames {users}'
)
for row in summary_rows:
print(summary_fmt.format(**row))
def setup_database(self):
print('Opening database file...')
self.db = sqlite3.connect(self.database_filename)
self.db.row_factory = sqlite3.Row
if not self.report_only:
print('Clearing database...')
self.db.execute('DROP TABLE IF EXISTS failures')
self.db.execute(SCHEMA)
def get_failures(self, lines):
for line in lines:
if 'Failed password' not in line:
continue
m = LOGIN_FAIL_REGEX.search(line)
if not m:
raise Exception(
'No login fail match in line {0!r}'.format(line))
info = m.groupdict()
m = METADATA_REGEX.match(line)
if not m:
raise Exception('No metadata match in line {0!r}'.format(line))
info.update(m.groupdict())
for intkey in ('sourceport', 'pid'):
info[intkey] = int(info[intkey])
when = datetime.strptime(info['timestamp'], DATETIME_FORMAT)
# Logs don't have a year
info['timestamp'] = when.replace(year=self.auth_log_year)
yield info
def store_failures(self, failures):
cursor = self.db.cursor()
cnames = ', '.join(name for name, _ in COLUMNS)
cplaces = ', '.join(':' + name for name, _ in COLUMNS)
sql = 'INSERT INTO failures ({0}) VALUES ({1})'.format(cnames, cplaces)
grouper = [iter(failures)] * 100
chunker = itertools.izip(*grouper)
for chunk in chunker:
with self.db:
cursor.executemany(sql, chunk)
def get_summary(self):
"""
Get the source, count of attempts, earliest occurance,
latest occurance, and list of usernames attempted.
"""
sql = """
SELECT
source,
count(*) AS attempts,
min(timestamp) as earliest,
max(timestamp) as latest,
group_concat(DISTINCT user) as users
FROM failures GROUP BY source ORDER BY attempts DESC"""
return self.db.execute(sql)
def main():
parser = argparse.ArgumentParser(description=textwrap.dedent(__doc__))
parser.add_argument('dbfile', help='Database output filename')
parser.add_argument(
'--authlog', default='/var/log/auth.log',
help='Path to auth log to parse')
parser.add_argument(
'--report-only', action='store_true',
help="Don't parse the log or recreate the database, just use "
"the existing data")
args = parser.parse_args()
app = Application(args.dbfile, args.authlog, args.report_only)
app.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment