Last active
November 20, 2017 08:23
-
-
Save niwinz/5067540 to your computer and use it in GitHub Desktop.
PF realtime network monitor (by ip) tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
from subprocess import Popen, PIPE | |
import argparse | |
import shlex | |
import datetime | |
import re | |
import ipaddress as ip | |
import functools | |
import os | |
import os.path | |
import threading | |
import json | |
import curses | |
import time | |
import socket | |
import functools | |
@functools.lru_cache(maxsize=100000, typed=False) | |
def humanize_bytes(bytes, precision=1): | |
"""Return a humanized string representation of a number of bytes. | |
Assumes `from __future__ import division`. | |
>>> humanize_bytes(1) | |
'1 byte' | |
>>> humanize_bytes(1024) | |
'1.0 kB' | |
>>> humanize_bytes(1024*123) | |
'123.0 kB' | |
>>> humanize_bytes(1024*12342) | |
'12.1 MB' | |
>>> humanize_bytes(1024*12342,2) | |
'12.05 MB' | |
>>> humanize_bytes(1024*1234,2) | |
'1.21 MB' | |
>>> humanize_bytes(1024*1234*1111,2) | |
'1.31 GB' | |
>>> humanize_bytes(1024*1234*1111,1) | |
'1.3 GB' | |
""" | |
abbrevs = ( | |
(1<<50, 'PB'), | |
(1<<40, 'TB'), | |
(1<<30, 'GB'), | |
(1<<20, 'MB'), | |
(1<<10, 'kB'), | |
(1, 'bytes') | |
) | |
if bytes == 1: | |
return '1 byte' | |
for factor, suffix in abbrevs: | |
if bytes >= factor: | |
break | |
return '%.*f %s' % (precision, bytes / factor, suffix) | |
def auto_lock(method): | |
@functools.wraps(method) | |
def _decorator(*args, **kwargs): | |
with threading.Lock(): | |
return method(*args, **kwargs) | |
return _decorator | |
class Status(object): | |
def __init__(self): | |
self.running = False | |
self.data = {} | |
def update_stamp(self): | |
self.last_updated = datetime.datetime.now() | |
def set_worker_status(self, running): | |
self.running = running | |
def generate_new_keydata(self): | |
return { | |
"in": 0, # bytes | |
"out": 0, # bytes | |
"packets": 0, # total number of packets analyzed | |
} | |
def update_key(self, key, dt): | |
if key not in self.data: | |
self.data[key] = self.generate_new_keydata() | |
data = self.data[key] | |
data["packets"] += 1 | |
# Update total trafic stats | |
direction = dt["direction"] | |
data[direction] += int(dt["bytes"]) | |
# Update ports stats | |
#if direction == "out": | |
# dst_port = dt["dst_port"] | |
# if dst_port not in data["ports"]: | |
# data["ports"][dst_port] = {"out": 0, "in": 0} | |
# data["ports"][dst_port][direction] += 1 | |
#else: | |
# src_port = dt["src_port"] | |
# if src_port not in data["ports"]: | |
# data["ports"][src_port] = {"out": 0, "in": 0} | |
# data["ports"][src_port][direction] += 1 | |
#data["last_packet"] = datetime.datetime.utcnow().isoformat() | |
status = Status() | |
status.update_stamp() | |
class StreamService(threading.Thread): | |
_file_cmd = "tcpdump -n -e -ttt -r {0}" | |
_file_cmd = "cat {0}" | |
_realtime_cmd = "tcpdump -n -e -ttt -i {0}" | |
rx = re.compile(r""" | |
^([^\s]+)\s # Code part | |
(rule\s[^\s]+)\s # Rule number | |
(\w+)\s(\w+)\s\w+\s([\w\d]+)\:\s # Rule definition | |
(\d+\.\d+\.\d+\.\d+)\.(\d+)\s # Source host and port | |
[\>\<]\s # Direction | |
(\d+\.\d+\.\d+\.\d+)\.(\d+) # Destination host and port | |
.* | |
length\s(\d+)$ # Packet length | |
""", flags=re.X) | |
def __init__(self, dest, stop_event): | |
self.dest = dest | |
self.stop_event = stop_event | |
super().__init__() | |
def update_status(self, data): | |
global status | |
*_, ract, rdir, rif, src_ip, src_port, dst_ip, dst_port, bs = data | |
key_host = src_ip if rdir == "in" else dst_ip | |
if key_host == "255.255.255.255": | |
return | |
if key_host.endswith(".255"): | |
return | |
if key_host == "0.0.0.0": | |
return | |
status.update_key(key_host, { | |
"src_ip": src_ip, | |
"dst_ip": dst_ip, | |
"src_port": src_port, | |
"dst_port": dst_port, | |
"rule_if": rif, | |
"bytes": bs, | |
"direction": "out" if rdir == "in" else "in", | |
}) | |
def parse_line(self, line): | |
res = self.rx.search(line) | |
if res: | |
self.update_status(res.groups()) | |
def run(self): | |
if os.path.exists(self.dest): | |
return self.parse(self.dest, True) | |
else: | |
return self.parse(self.dest, False) | |
def parse(self, path, is_file=True): | |
global status | |
if is_file: | |
final_cmd = self._file_cmd.format(path) | |
else: | |
final_cmd = self._realtime_cmd.format(path) | |
try: | |
p = Popen(shlex.split(final_cmd), stdout=PIPE, stderr=PIPE, close_fds=True) | |
status.set_worker_status(running=True) | |
for line in p.stdout: | |
self.parse_line(line.decode('utf-8')) | |
if self.stop_event.is_set(): | |
break | |
except KeyboardInterrupt: | |
p.terminate() | |
raise | |
finally: | |
status.set_worker_status(running=False) | |
class Ui(object): | |
def __init__(self): | |
self.win = curses.initscr() | |
curses.noecho() | |
#curses.echo() | |
#curses.cbreak() | |
def stop(self): | |
curses.endwin() | |
def draw_header(self): | |
self.win.move(0, 0) | |
self.win.hline("-", 100) | |
# Draw header | |
self.win.move(1,0) | |
self.win.addstr("Host") | |
self.win.move(1, 20) | |
self.win.addstr("Downloaded") | |
self.win.move(1, 35) | |
self.win.addstr("Uploaded") | |
self.win.move(1, 50) | |
self.win.addstr("Download speed") | |
self.win.move(1, 70) | |
self.win.addstr("Upload speed") | |
self.win.move(2, 0) | |
self.win.hline("-", 100) | |
def draw(self): | |
# Draw data | |
global status | |
sorted_items = sorted(status.data.items(), key=lambda x: x[1]['in'], reverse=True) | |
for i, item in enumerate(sorted_items, 3): | |
key, data = item | |
self.clear_line(i) | |
self.draw_host(i, key) | |
self.draw_downloaded_bytes(i, data) | |
self.draw_uploaded_bytes(i, data) | |
self.draw_download_speed(i, data) | |
self.draw_upload_speed(i, data) | |
self.win.refresh() | |
def ip_to_host(self, ip): | |
try: | |
return socket.gethostbyaddr(ip)[0] | |
except socket.herror: | |
return ip | |
def clear_line(self, position): | |
self.win.move(position, 0) | |
self.win.clrtoeol() | |
def draw_host(self, position, key): | |
self.win.move(position, 0) | |
self.win.addstr(self.ip_to_host(key)) | |
def draw_downloaded_bytes(self, position, data): | |
self.win.move(position, 20) | |
self.win.addstr(humanize_bytes(data["in"])) | |
def draw_uploaded_bytes(self, position, data): | |
self.win.move(position, 35) | |
self.win.addstr(humanize_bytes(data["out"])) | |
def draw_download_speed(self, position, data): | |
self.win.move(position, 50) | |
if "speed_in" in data: | |
self.win.addstr(humanize_bytes(data["speed_in"]) + "/s") | |
def draw_upload_speed(self, position, data): | |
self.win.move(position, 70) | |
if "speed_out" in data: | |
self.win.addstr(humanize_bytes(data["speed_out"]) + "/s") | |
class CurrentSpeedService(threading.Thread): | |
def __init__(self, stop_event): | |
self.stop_event = stop_event = stop_event = stop_event = stop_event | |
super().__init__() | |
def run(self): | |
global status | |
self.cache = {} | |
self.update_current_speed(status) | |
while not self.stop_event.is_set(): | |
time.sleep(1) | |
self.update_current_speed(status) | |
def update_current_speed(self, status): | |
for key, data in status.data.items(): | |
if key not in self.cache: | |
self.cache[key] = {"in": data["in"], "out": data["out"]} | |
else: | |
cache = self.cache[key] | |
data["speed_in"] = (data['in'] - cache["in"]) | |
data["speed_out"] = (data['out'] - cache["out"]) | |
cache["in"] = data['in'] | |
cache["out"] = data["out"] | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='pf log analyzer') | |
parser.add_argument('-r', action='store', dest="dest") | |
parser.add_argument('-t', action='store', dest='sleep', default="1") | |
args = parser.parse_args() | |
if not args.dest: | |
parser.print_help() | |
else: | |
stop_event = threading.Event() | |
stream_service = StreamService(args.dest, stop_event) | |
stream_service.start() | |
speed_service = CurrentSpeedService(stop_event) | |
speed_service.start() | |
ui = Ui() | |
ui.draw_header() | |
try: | |
while True: | |
ui.draw() | |
time.sleep(float(args.sleep)) | |
except KeyboardInterrupt: | |
ui.stop() | |
stop_event.set() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment