Skip to content

Instantly share code, notes, and snippets.

@gvanem
Last active June 28, 2018 20:00
Show Gist options
  • Select an option

  • Save gvanem/55abc461419e6197e53edff04fff86df to your computer and use it in GitHub Desktop.

Select an option

Save gvanem/55abc461419e6197e53edff04fff86df to your computer and use it in GitHub Desktop.
A Python netstat clone. Needs psutil, GeoIP and colour_print.py too (see below)
__all__ = [ 'printer', 'trace', 'color_print', 'textcolor' ]
#
# A simple module for printing with colours to the Windows console.
# Currently used only by the syslog-drop.py script.
#
# By G. Vanem <[email protected]> 2012.
#
import os, sys, inspect
def def_printer (s):
os.write (FD_STDOUT, s + '\n')
def def_textcolor (col):
pass
PY3 = (sys.version[0] >= '3')
printer = def_printer
textcolor = def_textcolor
orig_color = 0
have_tty = False
use_colors = 0
use_wconio = 1 # (PY3 == 0)
c_hnd = 0
TRACE = 0
FD_STDOUT = 1
FD_STDERR = 2
FD_TRACE = FD_STDOUT # FD_STDERR
FG_BLUE = 3
FG_CYAN = 3
FG_RED = 4
FG_BRIGHT_CYAN = 11
FG_BRIGHT_RED = 12
FG_BRIGHT_MAGENRA = 13
FG_YELLOW = 14
FG_WHITE = 15
FG_BRIGHT_GREEN = 10
color_map = { '0': 0,
'1': FG_CYAN,
'2': FG_YELLOW,
'3': FG_BRIGHT_RED,
'4': FG_BRIGHT_CYAN,
'5': FG_RED,
'6': FG_WHITE,
'7': FG_BRIGHT_GREEN
}
if use_wconio:
try:
import WConio
def wconio_printer (s):
WConio.cputs (s + "\r\n")
def wconio_textcolor (col):
if not use_colors:
return
if col == 0:
col = orig_color
else:
col |= orig_color & 0xF0
try:
WConio.textattr (col)
except:
global textcolor
textcolor = def_textcolor
pass
printer = wconio_printer
textcolor = wconio_textcolor
except ImportError:
c_hnd = 0
use_wconio = 0
print ("Failed to import WConio")
pass
else:
try:
import win32console
def wincon_printer (s):
try:
c_hnd.WriteConsole (s + "\r\n")
except:
trace ("wincon_printer(): Fallback to stdio.")
print (s)
global printer
printer = def_printer
def wincon_textcolor (col):
if not use_colors:
return
if col == 0:
col = orig_color
else:
col |= orig_color & 0xF0
try:
c_hnd.SetConsoleTextAttribute (col)
except:
global textcolor
textcolor = def_textcolor
pass
printer = wincon_printer
textcolor = wincon_textcolor
c_hnd = win32console.GetStdHandle (win32console.STD_OUTPUT_HANDLE)
except ImportError:
c_hnd = 0
pass
if hasattr(sys,"implementation") and sys.implementation.name == 'ironpython':
TRACE = 0
def trace (s):
if TRACE:
textcolor (FG_BRIGHT_CYAN)
frame = sys._getframe (0)
line = frame.f_back.f_lineno
file = inspect.getsourcefile (frame.f_back)
s = "%s(%3d): %s\n" % (os.path.basename(file), line, s)
if PY3:
os.write (FD_TRACE, bytes(s,"UTF-8"))
else:
os.write (FD_TRACE, s)
textcolor (0)
####################################################################################
def print_single (c):
if PY3:
os.write (FD_STDOUT, bytes(c,"UTF-8"))
else:
os.write (FD_STDOUT, "%c" % c)
def print_c (s, i):
if use_colors and have_tty:
if use_wconio:
WConio.putc (s[i])
else:
c_hnd.WriteConsole ("%s" % s[i])
else:
if i < len(s):
print_single (s[i])
def color_print (str):
i = 0
try:
while str:
if str[i] == '~':
col = str[i+1]
if col == '~':
print_c (col,0)
else:
textcolor (color_map[col])
i += 2
print_c (str,i)
i += 1
if i >= len(str):
break
if use_colors and have_tty:
c_hnd.WriteConsole ("\r\n")
else:
print_single ("\n")
except OSError as e:
os.write (FD_STDERR, "write error: %s\n" % e)
sys.exit (1)
textcolor (0)
return (1)
####################################################################################
def set_win_title (title = None):
try:
if title:
global win_title
win_title = win32console.GetConsoleTitle()
win32console.SetConsoleTitle (title)
trace ("Setting win-title \"%s\"" % title)
else:
trace ("Restoring win-title \"%s\"" % win_title)
win32console.SetConsoleTitle (win_title)
return (win_title)
except:
return (None)
####################################################################################
def init_console (use_col = 1):
have_tty = sys.stdout.isatty()
global use_colors
use_colors = use_col
if not use_colors:
global printer
printer = def_printer
global orig_color
if use_wconio:
orig_color = WConio.gettextinfo()[4]
else:
try:
hnd = win32console.PyConsoleScreenBufferType (c_hnd)
#global orig_color
orig_color = hnd.GetConsoleScreenBufferInfo()['Attributes']
except:
pass
trace ("orig_color: %d (%02X)" % (orig_color, orig_color))
trace ("c:_hnd %d" % c_hnd)
return (have_tty)
#!/usr/bin/env python
from __future__ import print_function
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
#
# G. Vanem <gvanem@yahoo-no>, Added features:
# GeoIP, IP2Location support.
# Program filtering.
# Continuous mode.
#
# For reference, the Linux netstat manula is here:
# https://www.computerhope.com/unix/unetstat.htm
#
"""
A clone of 'netstat'.
"""
import sys, os, time, socket, getopt
from socket import AF_INET, SOCK_STREAM, SOCK_DGRAM, gethostbyaddr
import color_print
from color_print import color_print as cprint
import psutil
PY3 = (sys.version[0] >= '3')
numeric = debug = 0
continuous = 0
continuous_wait = 2
show_all = False
show_program = False
geo = None
use_ip2loc = True # Prefer 'IP2Locatation' over yucky 'GeoIP'
have_ip2loc = False
ip2loc = None # The last geo.get_all(ip) object
only_this_program = None
only_this_program_times = [ 0, 0 ]
_TOTAL_TIMES = 0
_THIS_LOOP = 1
AD = "-"
AF_INET6 = getattr(socket, 'AF_INET6', object())
proto_map = { (AF_INET, SOCK_STREAM) : 'tcp',
(AF_INET6, SOCK_STREAM) : 'tcp6',
(AF_INET, SOCK_DGRAM) : 'udp',
(AF_INET6, SOCK_DGRAM) : 'udp6' }
proto_accept = [ "tcp", "tcp6", "udp", "udp6" ]
family_accept = [ AF_INET, AF_INET6 ]
####################################################################################
def nice_size (size):
if size > 1024*1024*1024:
size /= 1024*1024*1024
return ("%d GB" % size)
if size > 1024*1024:
size /= 1024*1024
return ("%d MB" % size)
if size > 1024:
size /= 1024
return ("%d kB" % size)
return ("%d B" % size)
#
# Interface for GeoIP
#
def geoip_get_datadir():
try:
return os.environ['GEOIP_ROOT'] + "/data/"
except KeyError:
return None
def geoip_init():
try:
import GeoIP
file = geoip_get_datadir() + "GeoIP.dat"
global geo
geo = GeoIP.open (file, GeoIP.GEOIP_STANDARD)
except:
pass
cprint ("Failed to import ~2GeoIP~0. No country info will be printed.\n")
return
if geo == None:
cprint ("Failed to open ~2%s~0. No country info will be printed.\n" % file)
def geoip_list_data():
try:
import GeoIP
head = " GeoIP version: %s" % GeoIP.lib_version()
except:
head = " GeoIP module not found"
data_dir = geoip_get_datadir()
if data_dir is None:
cprint ("%s.\n Data-files in \"~2$GEOIP_ROOT/data/~0\" not found." % head)
else:
cprint ("%s.\n Data-files in ~2%s~0:" % (head, data_dir))
try:
for f in sorted (os.listdir(data_dir)):
if f.upper().endswith('.DAT'):
st = os.stat (data_dir + f)
size = nice_size (st.st_size)
tim = time.strftime ('%Y-%m-%d %H:%M:%S', time.localtime(st.st_ctime))
cprint (' %-20s %12s %s' % (f, size, tim))
except WindowsError:
cprint (' ~2<None>~0 (GeoIP.dat file not found).')
except KeyError:
cprint (' ~2<None>')
def get_country (geo, ip):
if not geo:
return ''
if have_ip2loc:
global ip2loc
ip2loc = geo.get_all(ip)
if PY3:
ip2loc.city = ip2loc.city.decode("utf-8")
ip2loc.region = ip2loc.region.decode("utf-8")
country = ip2loc.country_long or ip2loc.country_short
else:
country = geo.country_code_by_addr (ip)
if PY3 and country:
country = country.decode("utf-8")
if not country or country == "-":
return ''
# Strip off at commas and hypens.
# E.g. "Korea, Republic of" -> "Korea"
# "Lao People's Democratic Republic" -> "Lao People"
#
c = country.find (",")
if c > 1:
return country[:c]
c = country.find ("'")
if c > 1:
return country[:c]
return country
#
# Interface for IP2Location
#
def ip2loc_get_datadir():
try:
return os.environ['IP2LOC_ROOT'] + "/data/"
except KeyError:
return None
def ip2loc_init():
_dir = ip2loc_get_datadir()
if not _dir:
cprint ("No ~2$IP2LOC_ROOT~0 is defined.")
return False
if 0:
file = _dir + "IP-COUNTRY.BIN"
else:
file = _dir + "IP2LOCATION-LITE-DB9.IPV6.BIN"
if not os.path.exists(file):
cprint ("Failed to find ~2%s~0." % file)
return False
try:
import IP2Location
global geo
geo = IP2Location.IP2Location (file)
if not geo:
cprint ("Failed to initialise IP2Location.")
return False
except:
cprint ("Failed to import ~2IP2Location~0.")
return False
global have_ip2loc
have_ip2loc = True
return True
def ip2loc_list_data():
try:
import IP2Location
intro = ""
except:
intro = " IP2Location module not found.\n"
print (intro, end="")
data_dir = ip2loc_get_datadir()
if data_dir is None:
cprint (" Data-files in \"~2$IP2LOC_ROOT/data/~0\" not found.")
else:
cprint (" Data-files in ~2%s~0:" % data_dir)
try:
for f in sorted (os.listdir(data_dir)):
if f.upper().endswith(".BIN"):
st = os.stat (data_dir + f)
size = nice_size (st.st_size)
tim = time.strftime ('%Y-%m-%d %H:%M:%S', time.localtime(st.st_ctime))
cprint (" %-30s %12s %s" % (f, size, tim))
except WindowsError:
cprint (" ~2<None>~0 (IP2*.BIN files not found).")
except KeyError:
cprint (" ~2<None>")
####################################################################################
def host_name (addr):
if numeric:
return addr
name = str (addr)
if name == "0.0.0.0":
return name
try:
name, aliases, ipaddrs = gethostbyaddr (name)
except socket.herror or socket.gaierror:
pass
except KeyboardInterrupt:
cprint ("\nCtrl-C")
sys.exit (1)
#
# Shorthen such long names:
# a193-212-178-9.deploy.akamaitechnologies.com
# into
# ..178-9.deploy.akamaitechnologies.com
#
if len(name) > 33:
return ".." + name[len(name)-33:]
return name
def addr_port (af, addr, port):
if af == AF_INET6:
return "[%s]:%s" % (addr, port)
return "%s:%s" % (addr, port)
loops = 0
def do_stop():
if continuous == 0:
return 1
global loops
loops += 1
if loops > 1:
if only_this_program and only_this_program[0] != '!':
num = " %d" % only_this_program_times [_THIS_LOOP]
else:
num = ""
cprint ("----" + num)
try:
time.sleep (continuous_wait)
except KeyboardInterrupt:
cprint ("\nCtrl-C")
sys.exit (1)
return 0
def main():
if numeric:
header = "~6%-5s %-22s %-22s %2.2s %-12s %-22s %s %s~0"
templ = "~2%-5s~0 %-22s %-22s %2.2s ~6%-12s ~2%-20s ~0%5d %s"
else:
header = "~6Resolving addresses takes time, hang on...\n%-5s %-22s %-42s %2.2s %-12s %-22s %s %s~0"
templ = "~2%-5s~0 %-22s %-42s %2.2s ~6%-12s ~2%-20s ~0%5d %s"
if have_ip2loc:
cprint (header % ("Proto", "Local addr", "Remote addr", "", "Status", "Program name", "Pid", "Country (City/Region)"))
else:
cprint (header % ("Proto", "Local addr", "Remote addr", "CC", "Status", "Program name", "Pid", ""))
proc_names = {}
quit = 0
#########################################################################
def handle_connection (c):
if c.family not in family_accept:
return False, None, None
if not show_all and c.status not in ["ESTABLISHED", "TIME_WAIT"]:
return False, None, None
p_name = proc_names.get (c.pid, '?').lower()
if only_this_program:
if only_this_program[0] == '!':
if p_name == only_this_program[1:]:
return False, None, None
elif p_name != only_this_program:
return False, None, None
protocol = proto_map [(c.family, c.type)]
if protocol not in proto_accept:
return False, None, None
return True, p_name, protocol
#########################################################################
def get_remote_info (c):
raddr = None
country = ""
short_country = ""
long_country = ""
if c.raddr:
ip, port = c.raddr
raddr = addr_port (c.family, host_name(ip), port)
country = get_country (geo, ip)
if have_ip2loc:
long_country = country
else:
short_country = country
if have_ip2loc and long_country != "":
if ip2loc.city and ip2loc.city != "-":
long_country += " (" + ip2loc.city + ")"
return raddr, short_country, long_country
while not quit:
quit = do_stop()
global only_this_program_times
only_this_program_times [_THIS_LOOP] = 0
for p in psutil.process_iter():
try:
proc_names[p.pid] = p.name()
except psutil.Error:
pass
except KeyboardInterrupt:
cprint ("\nCtrl-C")
sys.exit (1)
for c in psutil.net_connections(kind='inet'):
handle, p_name, protocol = handle_connection (c)
if not handle:
continue
laddr = addr_port (c.family, c.laddr[0], c.laddr[1])
raddr, short_country, long_country = get_remote_info (c)
cprint (templ % (protocol, laddr, raddr or AD,
short_country, c.status, p_name,
c.pid, long_country))
only_this_program_times [_TOTAL_TIMES] += 1
only_this_program_times [_THIS_LOOP] += 1
def final_report():
if only_this_program and only_this_program[0] != '!':
print ("%s found %d times in connection-table" % \
(only_this_program, only_this_program_times[_TOTAL_TIMES]))
def some_examples():
return r'''
netstat -4tp chrome - show all connected IPv4 TCP ports from programs matching 'chrome'.
netstat -uanp chrome - show all unconnected IPv4/IPv6 UDP ports from programs matching 'chrome'.
netstat -4ncp !chrome - continuously show IPv4 addresses numerically from all programs not matching 'chrome'
netstat -an|grep 'San Francisco' - show all ports for remote locations matching 'San Francisco' (needs IP2Location).
'''
def usage (e = None, detailed = False):
if e:
print (e)
print ("""Usage: %s [options] <program | !program>
-h - show this help.
-n - show address numerically.
-c - show information every %d second continuously (^C to stop).
-d - turn on some debug.
-a - show all endpoints.
-p - show the program (see below).
-t - show only TCP endpoints.
-u - show only UDP endpoints.
-4 - show only IPv4 endpoints.
-6 - show only IPv6 endpoints.
If '-p' and 'program' is specified, show only endpoints for this 'program'.
If '-p' and '!program' is specified, show only endpoints NOT for this 'program'.
""" % (sys.argv[0], continuous_wait))
if detailed:
if use_ip2loc:
ip2loc_list_data()
else:
geoip_list_data()
print ("\n Examples: %s" % some_examples(), end="")
sys.exit (0)
def parse_cmd_line():
do_help = 0
try:
opts, args = getopt.getopt (sys.argv[1:], "hncdaptu46")
except getopt.GetoptError as e:
usage ("Error: %s" % e.msg)
for o, a in opts:
if o == "-h":
do_help = 1
break
elif o == "-n":
global numeric
numeric = 1
elif o == "-c":
global continuous
continuous = 1
elif o == "-a":
global show_all
show_all = True
elif o == "-p":
global show_program
show_program = True
elif o == "-t":
proto_accept.remove ("udp")
proto_accept.remove ("udp6")
elif o == "-u":
proto_accept.remove ("tcp")
proto_accept.remove ("tcp6")
elif o == '-d':
global debug
debug = color_print.TRACE = 1
elif o == '-4':
family_accept.remove (AF_INET6)
elif o == '-6':
family_accept.remove (AF_INET)
color_print.init_console()
if not sys.stdout.isatty():
color_print.trace ("isatty() == 0; Cannot use Windows Console.")
if do_help:
usage (detailed = True)
if args:
if not show_program:
print ("Ignoring the remaining cmd-line. Did you forget the use the '-p' option?")
else:
global only_this_program
only_this_program = args[0].lower()
if not only_this_program.endswith(".exe"):
only_this_program += ".exe"
def init():
if use_ip2loc:
if not ip2loc_init():
cprint ("Trying GeoIP.")
if not have_ip2loc:
geoip_init()
parse_cmd_line()
init()
main()
final_report()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment