Skip to content

Instantly share code, notes, and snippets.

@sourceperl
Created December 20, 2017 08:45
Show Gist options
  • Save sourceperl/d48b295795ebffac471a4a1c0e986113 to your computer and use it in GitHub Desktop.
Save sourceperl/d48b295795ebffac471a4a1c0e986113 to your computer and use it in GitHub Desktop.
Auto-build modbus flow map graph on a tk canvas (with scapy)
#!/usr/bin/env python3
# auto-build modbus flow map graph on a tk canvas
import threading
import tkinter as tk
from scapy.all import *
import networkx as nx
# graph define
g = nx.DiGraph()
th_lock = threading.Lock()
def process_pkt(pkt):
global g
if TCP in pkt and pkt[TCP].dport == 502:
# print('%s -> %s: %s' % (pkt[IP].src, pkt[IP].dst, pkt[2].load))
with th_lock:
g.add_edge(pkt[IP].src, pkt[IP].dst)
def start_sniff():
sniff(prn=process_pkt, filter='tcp', store=0)
class DrawApp(tk.Tk):
def __init__(self, *args, **kwargs):
tk.Tk.__init__(self, *args, **kwargs)
# configure main window
self.wm_title('Modbus/TCP flow map')
# canvas
self.can = tk.Canvas(self, bg='grey', width=500, height=500)
self.can.grid(row=0, columnspan=2)
tk.Button(self, text='update', command=lambda: self.update_canvas()).grid(row=1, column=0)
tk.Button(self, text='exit', command=lambda: exit(0)).grid(row=1, column=1)
# launch tree grow
self.update_canvas()
def update_canvas(self):
# wipe all
self.can.delete('all')
# lock thread
with th_lock:
# auto-build layout
pos = nx.spring_layout(g, center=(250, 250), scale=100)
# draw canvas
# draw all nodes as rounds
for c_node in g.nodes:
self.draw_node(pos[c_node], name=c_node, radius=20, color='yellow')
# draw all edge as arrows
for c_edge in g.edges():
self.can.create_line(tuple(pos[c_edge[0]]), tuple(pos[c_edge[1]]), width=3,
fill='blue', arrow=tk.LAST, arrowshape=(16, 20, 6))
# auto-refresh
# self.after(1000, func=self.update_canvas)
def draw_node(self, pos, name='', radius=8, color='green'):
(x, y) = pos
(x0, y0) = (x - radius, y - radius)
(x1, y1) = (x + radius + 1, y + radius + 1)
self.can.create_oval(x0, y0, x1, y1, fill=color)
self.can.create_text(x0 + radius, y0 - radius, text=name)
def draw_link(self, pos_from, pos_to, width=2, color='blue'):
self.can.create_line(pos_from, pos_to, width=width, fill=color)
if __name__ == '__main__':
# launch sniffer thread
th_sniff = threading.Thread(target=start_sniff)
th_sniff.daemon = True
th_sniff.start()
# main Tk App
app = DrawApp()
app.mainloop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment