Created
October 10, 2019 18:32
-
-
Save joar/ffbb0193ee6525c2119754d6ef86ce71 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import multiprocessing | |
import datetime as dt | |
import logging | |
import time | |
from queue import Empty | |
import pandas as pd | |
from colorama import Fore | |
import pydivert | |
_log = logging.getLogger(__name__) | |
class PacketStats(object): | |
def __init__(self): | |
self.process = multiprocessing.Process(target=self.run, args=()) | |
self.process.daemon = True | |
self.ips = multiprocessing.Manager().list() | |
self.stats_queue = multiprocessing.Manager().Queue() | |
def start(self): | |
self.process.start() | |
_log.info('Dispatched PacketStats process') | |
def stop(self): | |
self.process.terminate() | |
_log.info('Terminated PacketStats process') | |
_log.info('Collected a total of {} IPs'.format(len(self.ips))) | |
def run(self): | |
with pydivert.WinDivert("(udp.SrcPort == 6672 or udp.DstPort == 6672) and ip") as w: | |
for packet in w: | |
dst = packet.ip.dst_addr | |
src = packet.ip.src_addr | |
if packet.is_inbound: | |
remote_addr = src | |
else: | |
remote_addr = dst | |
self.stats_queue.put(dict( | |
remote_addr=remote_addr, | |
ts=dt.datetime.now() | |
)) | |
w.send(packet) | |
def run(): | |
stats_coll = PacketStats() | |
stats_coll.start() | |
packet_stats = [] | |
try: | |
while True: | |
while True: | |
try: | |
stat = stats_coll.stats_queue.get_nowait() | |
packet_stats.append(stat) | |
except Empty: | |
min_ = dt.datetime.now() - dt.timedelta(seconds=1) | |
packet_stats = [ | |
stat | |
for stat in packet_stats | |
if stat["ts"] > min_ | |
] | |
break | |
if len(packet_stats) == 0: | |
continue | |
df = pd.DataFrame(packet_stats) | |
df.ts = pd.to_datetime(df.ts) | |
last_5s = df | |
stats = last_5s.groupby("remote_addr").count() \ | |
.sort_values(["ts"], ascending=False) | |
print(stats) | |
time.sleep(1) | |
except KeyboardInterrupt: | |
stats_coll.stop() | |
with open("stats.json", "w") as fd: | |
json.dump(packet_stats, fd, default=lambda o: o.isoformat()) | |
print(f"{Fore.LIGHTWHITE_EX}Stopped collecting stats{Fore.RESET}") | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO) | |
try: | |
if not pydivert.WinDivert.is_registered(): | |
pydivert.WinDivert.register() | |
run() | |
finally: | |
if pydivert.WinDivert.is_registered(): | |
pydivert.WinDivert.unregister() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment