Skip to content

Instantly share code, notes, and snippets.

@yorickvP
Last active April 2, 2018 14:37
Show Gist options
  • Save yorickvP/c7b64b0999b3b84e9213ad292397ae10 to your computer and use it in GitHub Desktop.
Save yorickvP/c7b64b0999b3b84e9213ad292397ae10 to your computer and use it in GitHub Desktop.
auto audit extravaganza
# asserts: 64-bit
# auditd running
# sudo auditctl -a exit,always -F arch=b64 -S connect
import codecs, struct, socket, sys, os, re
class AuditLog(object):
def __init__(self):
self.last_event = -1
self.collected = dict()
self.inodere = re.compile("socket:\[(\d+)\]")
def handle_syscall_event(self, event):
if event['SYSCALL']['syscall'] == 42:
self.handle_connect(event)
def find_local_address(self, inode):
with open("/proc/net/tcp", 'r') as f:
[hdr, *lines] = f
# header = dict((v,k) for k,v in enumerate(hdr.split()))
header = {"inode": 9, "local_address": 1}
for line in lines:
line = line.split()
if line[header['inode']] == inode:
ip, port = (codecs.decode(s, 'hex') for s in line[header['local_address']].split(':'))
port, = struct.unpack_from(">H", port)
ip = socket.inet_ntoa(bytes(reversed(ip)))
return ip, port
def findinode(self, pid, fd):
try:
sockstr = os.readlink("/proc/{}/fd/{}".format(pid, fd))
match = self.inodere.fullmatch(sockstr)
if match:
return match[1]
except FileNotFoundError: pass
def handle_connect(self, event):
proctitle = event['PROCTITLE']['proctitle']
saddr = event['SOCKADDR']['saddr']
connect = event['SYSCALL']
if saddr.family == socket.AF_INET and saddr.port == 80:
print('{} connected to {}:{}'.format(proctitle, saddr.ip, saddr.port))
inode = self.findinode(connect['pid'], connect['a0'])
if inode:
laddr = self.find_local_address(inode)
print(inode, laddr)
def handle_line(self, line):
try:
d = dict(x.split('=', 1) for x in line.split(' ') if x)
except:
print("failed to parse {}".format(line))
return
if d['msg'] != self.last_event:
self.collected = dict()
self.last_event = d['msg']
self.collected[d['type']] = d
if d['type'] == "PROCTITLE":
if d['proctitle'][0] == '"':
d['proctitle'] = d['proctitle'][1:-1]
else:
d['proctitle'] = codecs.decode(d['proctitle'], 'hex').replace(b'\x00', b' ')
d['proctitle'] = d['proctitle'].decode('utf8')
if d['type'] == "SYSCALL":
d['syscall'] = int(d['syscall'])
if d['type'] == "SOCKADDR":
d['saddr'] = Saddr(d['saddr'])
if self.collected.keys() == set(('SYSCALL', 'PROCTITLE', 'SOCKADDR')):
self.handle_syscall_event(self.collected)
class Saddr(object):
def __init__(self, saddr):
self.raw = codecs.decode(saddr, 'hex')
self.family, = struct.unpack_from("@H", self.raw)
if self.family == socket.AF_INET:
self.port, ip = struct.unpack_from("!H4s", self.raw, offset=2)
self.ip = socket.inet_ntoa(ip)
elif self.family == socket.AF_UNIX:
self.fname = self.raw[2:]
au = AuditLog()
while True:
l = sys.stdin.readline().strip()
if not l: break
au.handle_line(l)
#!/usr/bin/env nix-shell
#!nix-shell -i bash -p bash python3 audit
mkdir -p /var/log/audit
HAD_AUDITD=$(pgrep ^auditd)
if [ ! $HAD_AUDITD ]; then
auditd
fi
function cleanup {
auditctl -d exit,always -F arch=b64 -S connect
if [ ! $HAD_AUDITD ]; then
kill $HAD_AUDITD
fi
}
trap cleanup EXIT
auditctl -a exit,always -F arch=b64 -S connect
tail -f /var/log/audit/audit.log | python3 filter_tcp_80.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment