Created
May 27, 2014 12:49
-
-
Save foxx/120cfe6dab28516e73e0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import re | |
import datetime | |
import smtplib | |
import itertools | |
from email.mime.text import MIMEText | |
class ScanAuthLog(object): | |
""" | |
ssh auth.log scanner | |
Scans ssh authentication log and sends a few detected failures via email. | |
This script should NOT be used in production for the following reasons; | |
* String matching rules have only been checked against Ubuntu 12.x, there | |
are no unit tests, no additional checks, no malformed string checks, | |
and will almost certainly crash if more data is thrown at it | |
* Log scanning should never use regex. The amount of data being parsed usually | |
means its better to custom write the parsers using string extraction, manual | |
index offsets, and hinting. This gets more noticeable when scanning huge | |
log files, e.g. 20GB+ | |
* Sending log events via email is not production worthy. Seriously, Splunk or | |
any other log collection service would be much more suited for the job | |
* Fail2ban and other daemons already handle a lot of the necessary parsing and | |
string detection, they will be more mature and also handle email notifications. | |
* This does not work with multiple lines, it's not optimized for memory usage, | |
and would probably crash and burn if used on anything more than 1 file | |
* No pid state, and no duplication detection (e.g. if i run twice, i get the same | |
results twice). The longer the file grows, the more results you'll get. You could | |
store last known ts or byte offset in a state file, but again, why bother when you | |
have better tools that can already do this | |
""" | |
# Define once, speed improvement | |
re_ts_match = re.compile(r'(?P<timestamp>\w+ \w+ \d{2}:\d{2}:\d{2}) (?P<host>.+) (?P<proc>.+)', re.IGNORECASE) | |
# Failed password for invalid user user1 from 127.0.0.1 port 53024 ssh2 | |
# Failed password for root from 127.0.0.1 port 53025 ssh2 | |
re_match1 = re.compile(r'Failed password for (?:invalid user |)(?P<username>.+) from (?P<host>.+) port (?P<port>.+) (?P<service>.+)', re.IGNORECASE) | |
# Invalid user user1 from 127.0.0.1 | |
re_match2 = re.compile(r'Invalid user (?P<username>.+) from (?P<host>.+)', re.IGNORECASE) | |
# input_userauth_request: invalid user roohfhfdhfdhfdt [preauth] | |
re_match3 = re.compile(r'input_userauth_request: invalid user (?P<username>.+) \[preauth\]', re.IGNORECASE) | |
def check(self, path): | |
results = [] | |
with open(path, 'rb') as fh: | |
for line in fh.readlines(): | |
try: | |
r = self.process_line(line) | |
if r: | |
results.append(r) | |
except: | |
print "Error processing line: " + line | |
raise | |
# create simple email | |
e_subject = "Authentication failures: %s" % ( datetime.datetime.now(), ) | |
e_from = "XXX" | |
e_to = "XXX" | |
e_msg = "Please see failed auth events below\r\n" | |
# build email text, grouped by description | |
skey = lambda x: x['error_desc'] | |
results = sorted(results, key=skey) | |
for error_desc, events in itertools.groupby(results, skey): | |
e_msg += "\r\nEvent type: %s\r\n" % ( error_desc, ) | |
for event in events: | |
items = [] | |
items += [ "=".join(x) for x in event['info'].items() ] | |
items += [ "=".join(x) for x in event['meta'].items() ] | |
msg_line = " " + ", ".join(items) + "\r\n" | |
e_msg += msg_line | |
# create email | |
msg = MIMEText(e_msg) | |
msg['Subject'] = e_subject | |
msg['From'] = e_from | |
msg['To'] = e_to | |
s = smtplib.SMTP('localhost') | |
s.sendmail(e_from, [e_to], msg.as_string()) | |
s.quit() | |
def process_line(self, line): | |
# break into components | |
assert line.count(": "), "invalid line" | |
line = line.strip() | |
part1, part2 = line.split(": ", 1) | |
# no datetime parsing, not without modifying the ssh logger | |
meta = self.re_ts_match.match(part1).groupdict() | |
# avoid hitting regex on each line (hinting) | |
if 'Failed password' in part2: | |
m = self.re_match1.match(part2) | |
assert m, "invalid match" | |
return dict(error_desc="Failed password authentication", meta=meta, info=m.groupdict()) | |
elif 'Invalid user' in part2: | |
m = self.re_match2.match(part2) | |
assert m, "invalid match" | |
return dict(error_desc="Invalid user", meta=meta, info=m.groupdict()) | |
elif 'input_userauth_request: invalid user' in part2: | |
m = self.re_match3.match(part2) | |
assert m, "invalid match" | |
return dict(error_desc="Invalid user", meta=meta, info=m.groupdict()) | |
#(on another screen testing regex, sec) | |
if __name__ == '__main__': | |
s = ScanAuthLog() | |
s.check('/var/log/auth.log') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment