Last active
May 29, 2020 08:46
-
-
Save athoune/8190f92fe954563cf520225319019708 to your computer and use it in GitHub Desktop.
nia nia nia ssh public key
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import pwd | |
import sys | |
import re | |
import subprocess | |
import tempfile | |
import os | |
import io | |
from collections import namedtuple | |
import socket | |
ACCEPTED = re.compile(r"Accepted publickey for ([^ ]+) from ([^ ]+) port (\d+) ssh2: (\w+) ([^ ]+)") | |
OPTIONS = re.compile(r'^((([a-z\-]+="[^"]+")|([^ ]+))+) ') | |
Key = namedtuple('Key', ['type', 'key', 'options', 'comment']) | |
Fingerprint = namedtuple('Fingerprint', ['size', 'hash', 'type']) | |
def ssh_key(line): | |
KEYS = ['[email protected]', 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384', | |
'ecdsa-sha2-nistp521', '[email protected]', 'ssh-ed25519', 'ssh-dss', 'ssh-rsa'] | |
options = None | |
if not line.split(' ', 1)[0] in KEYS: | |
m = OPTIONS.match(line) | |
if m: | |
options = m.group(0) | |
line = line[len(options):] | |
slugs = line.strip().split(' ', 2) | |
assert slugs[0] in KEYS, slugs[0] | |
comment = None | |
if len(slugs) == 3: | |
comment = slugs[2] | |
return Key(slugs[0], slugs[1], options, comment) | |
def keys(user): | |
u = pwd.getpwnam(user) | |
with open(u.pw_dir + '/.ssh/authorized_keys', 'r') as f: | |
for line in f.readlines(): | |
if line.strip() == '' or line.startswith('#'): | |
continue | |
k = ssh_key(line) | |
fd, tmp = tempfile.mkstemp() | |
f = open(fd, 'w') | |
f.write(line) | |
f.close() | |
with subprocess.Popen(['ssh-keygen', '-l', '-f', tmp], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: | |
rc = proc.wait() | |
if rc != 0: | |
raise Exception(line, proc.stderr.read()) | |
r = proc.stdout.read() | |
rr = r.strip().decode('utf8').split(' ') | |
yield Fingerprint(int(rr[0]), rr[1], rr[-1][1:-1]), k.comment | |
class Keys: | |
def __init__(self): | |
self._users = dict() | |
def __call__(self, user, fingerprint): | |
if user not in self._users: | |
self._users[user] = dict() | |
for fp, comment in keys(user): | |
self._users[user][fp.hash] = comment | |
return self._users[user][fingerprint] | |
class Abuse: | |
def __init__(self): | |
self._abuses = dict() | |
def __call__(self, ip): | |
if ip not in self._abuses: | |
with subprocess.Popen(['whois', '-b', ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc: | |
rc = proc.wait() | |
if rc != 0: | |
raise Exception(ip, proc.stderr.read()) | |
for line in proc.stdout: | |
line = line.decode('utf8') | |
if line.startswith("abuse-mailbox"): | |
self._abuses[ip] = re.compile(r"\s+").split(line[:-1])[-1] | |
return self._abuses.get(ip) | |
def read_auth_log(): | |
keys = Keys() | |
abuse = Abuse() | |
for line in sys.stdin.readlines(): | |
slugs = line[:-1].split(" ", 5) | |
if not slugs[4].startswith("sshd"): | |
continue | |
comment = slugs[5] | |
if not comment.startswith("Accepted publickey"): | |
continue | |
m = ACCEPTED.match(comment) | |
user = m.group(1) | |
fp = m.group(5) | |
print(slugs) | |
#print(m.group(2)) | |
name = None | |
try: | |
name, alias, addresslist = socket.gethostbyaddr(m.group(2)) | |
except Exception as e: | |
#print(m.group(2), e) | |
name = abuse(m.group(2)) | |
try: | |
print(keys(user, fp), name) | |
except Exception as e: | |
print("aaaaaaah", e) | |
if __name__ == "__main__": | |
read_auth_log() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment