Last active
June 28, 2018 20:00
-
-
Save gvanem/55abc461419e6197e53edff04fff86df to your computer and use it in GitHub Desktop.
A Python netstat clone. Needs psutil, GeoIP and colour_print.py too (see below)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| __all__ = [ 'printer', 'trace', 'color_print', 'textcolor' ] | |
| # | |
| # A simple module for printing with colours to the Windows console. | |
| # Currently used only by the syslog-drop.py script. | |
| # | |
| # By G. Vanem <[email protected]> 2012. | |
| # | |
| import os, sys, inspect | |
| def def_printer (s): | |
| os.write (FD_STDOUT, s + '\n') | |
| def def_textcolor (col): | |
| pass | |
| PY3 = (sys.version[0] >= '3') | |
| printer = def_printer | |
| textcolor = def_textcolor | |
| orig_color = 0 | |
| have_tty = False | |
| use_colors = 0 | |
| use_wconio = 1 # (PY3 == 0) | |
| c_hnd = 0 | |
| TRACE = 0 | |
| FD_STDOUT = 1 | |
| FD_STDERR = 2 | |
| FD_TRACE = FD_STDOUT # FD_STDERR | |
| FG_BLUE = 3 | |
| FG_CYAN = 3 | |
| FG_RED = 4 | |
| FG_BRIGHT_CYAN = 11 | |
| FG_BRIGHT_RED = 12 | |
| FG_BRIGHT_MAGENRA = 13 | |
| FG_YELLOW = 14 | |
| FG_WHITE = 15 | |
| FG_BRIGHT_GREEN = 10 | |
| color_map = { '0': 0, | |
| '1': FG_CYAN, | |
| '2': FG_YELLOW, | |
| '3': FG_BRIGHT_RED, | |
| '4': FG_BRIGHT_CYAN, | |
| '5': FG_RED, | |
| '6': FG_WHITE, | |
| '7': FG_BRIGHT_GREEN | |
| } | |
| if use_wconio: | |
| try: | |
| import WConio | |
| def wconio_printer (s): | |
| WConio.cputs (s + "\r\n") | |
| def wconio_textcolor (col): | |
| if not use_colors: | |
| return | |
| if col == 0: | |
| col = orig_color | |
| else: | |
| col |= orig_color & 0xF0 | |
| try: | |
| WConio.textattr (col) | |
| except: | |
| global textcolor | |
| textcolor = def_textcolor | |
| pass | |
| printer = wconio_printer | |
| textcolor = wconio_textcolor | |
| except ImportError: | |
| c_hnd = 0 | |
| use_wconio = 0 | |
| print ("Failed to import WConio") | |
| pass | |
| else: | |
| try: | |
| import win32console | |
| def wincon_printer (s): | |
| try: | |
| c_hnd.WriteConsole (s + "\r\n") | |
| except: | |
| trace ("wincon_printer(): Fallback to stdio.") | |
| print (s) | |
| global printer | |
| printer = def_printer | |
| def wincon_textcolor (col): | |
| if not use_colors: | |
| return | |
| if col == 0: | |
| col = orig_color | |
| else: | |
| col |= orig_color & 0xF0 | |
| try: | |
| c_hnd.SetConsoleTextAttribute (col) | |
| except: | |
| global textcolor | |
| textcolor = def_textcolor | |
| pass | |
| printer = wincon_printer | |
| textcolor = wincon_textcolor | |
| c_hnd = win32console.GetStdHandle (win32console.STD_OUTPUT_HANDLE) | |
| except ImportError: | |
| c_hnd = 0 | |
| pass | |
| if hasattr(sys,"implementation") and sys.implementation.name == 'ironpython': | |
| TRACE = 0 | |
| def trace (s): | |
| if TRACE: | |
| textcolor (FG_BRIGHT_CYAN) | |
| frame = sys._getframe (0) | |
| line = frame.f_back.f_lineno | |
| file = inspect.getsourcefile (frame.f_back) | |
| s = "%s(%3d): %s\n" % (os.path.basename(file), line, s) | |
| if PY3: | |
| os.write (FD_TRACE, bytes(s,"UTF-8")) | |
| else: | |
| os.write (FD_TRACE, s) | |
| textcolor (0) | |
| #################################################################################### | |
| def print_single (c): | |
| if PY3: | |
| os.write (FD_STDOUT, bytes(c,"UTF-8")) | |
| else: | |
| os.write (FD_STDOUT, "%c" % c) | |
| def print_c (s, i): | |
| if use_colors and have_tty: | |
| if use_wconio: | |
| WConio.putc (s[i]) | |
| else: | |
| c_hnd.WriteConsole ("%s" % s[i]) | |
| else: | |
| if i < len(s): | |
| print_single (s[i]) | |
| def color_print (str): | |
| i = 0 | |
| try: | |
| while str: | |
| if str[i] == '~': | |
| col = str[i+1] | |
| if col == '~': | |
| print_c (col,0) | |
| else: | |
| textcolor (color_map[col]) | |
| i += 2 | |
| print_c (str,i) | |
| i += 1 | |
| if i >= len(str): | |
| break | |
| if use_colors and have_tty: | |
| c_hnd.WriteConsole ("\r\n") | |
| else: | |
| print_single ("\n") | |
| except OSError as e: | |
| os.write (FD_STDERR, "write error: %s\n" % e) | |
| sys.exit (1) | |
| textcolor (0) | |
| return (1) | |
| #################################################################################### | |
| def set_win_title (title = None): | |
| try: | |
| if title: | |
| global win_title | |
| win_title = win32console.GetConsoleTitle() | |
| win32console.SetConsoleTitle (title) | |
| trace ("Setting win-title \"%s\"" % title) | |
| else: | |
| trace ("Restoring win-title \"%s\"" % win_title) | |
| win32console.SetConsoleTitle (win_title) | |
| return (win_title) | |
| except: | |
| return (None) | |
| #################################################################################### | |
| def init_console (use_col = 1): | |
| have_tty = sys.stdout.isatty() | |
| global use_colors | |
| use_colors = use_col | |
| if not use_colors: | |
| global printer | |
| printer = def_printer | |
| global orig_color | |
| if use_wconio: | |
| orig_color = WConio.gettextinfo()[4] | |
| else: | |
| try: | |
| hnd = win32console.PyConsoleScreenBufferType (c_hnd) | |
| #global orig_color | |
| orig_color = hnd.GetConsoleScreenBufferInfo()['Attributes'] | |
| except: | |
| pass | |
| trace ("orig_color: %d (%02X)" % (orig_color, orig_color)) | |
| trace ("c:_hnd %d" % c_hnd) | |
| return (have_tty) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| from __future__ import print_function | |
| # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. | |
| # Use of this source code is governed by a BSD-style license that can be | |
| # found in the LICENSE file. | |
| # | |
| # G. Vanem <gvanem@yahoo-no>, Added features: | |
| # GeoIP, IP2Location support. | |
| # Program filtering. | |
| # Continuous mode. | |
| # | |
| # For reference, the Linux netstat manula is here: | |
| # https://www.computerhope.com/unix/unetstat.htm | |
| # | |
| """ | |
| A clone of 'netstat'. | |
| """ | |
| import sys, os, time, socket, getopt | |
| from socket import AF_INET, SOCK_STREAM, SOCK_DGRAM, gethostbyaddr | |
| import color_print | |
| from color_print import color_print as cprint | |
| import psutil | |
| PY3 = (sys.version[0] >= '3') | |
| numeric = debug = 0 | |
| continuous = 0 | |
| continuous_wait = 2 | |
| show_all = False | |
| show_program = False | |
| geo = None | |
| use_ip2loc = True # Prefer 'IP2Locatation' over yucky 'GeoIP' | |
| have_ip2loc = False | |
| ip2loc = None # The last geo.get_all(ip) object | |
| only_this_program = None | |
| only_this_program_times = [ 0, 0 ] | |
| _TOTAL_TIMES = 0 | |
| _THIS_LOOP = 1 | |
| AD = "-" | |
| AF_INET6 = getattr(socket, 'AF_INET6', object()) | |
| proto_map = { (AF_INET, SOCK_STREAM) : 'tcp', | |
| (AF_INET6, SOCK_STREAM) : 'tcp6', | |
| (AF_INET, SOCK_DGRAM) : 'udp', | |
| (AF_INET6, SOCK_DGRAM) : 'udp6' } | |
| proto_accept = [ "tcp", "tcp6", "udp", "udp6" ] | |
| family_accept = [ AF_INET, AF_INET6 ] | |
| #################################################################################### | |
| def nice_size (size): | |
| if size > 1024*1024*1024: | |
| size /= 1024*1024*1024 | |
| return ("%d GB" % size) | |
| if size > 1024*1024: | |
| size /= 1024*1024 | |
| return ("%d MB" % size) | |
| if size > 1024: | |
| size /= 1024 | |
| return ("%d kB" % size) | |
| return ("%d B" % size) | |
| # | |
| # Interface for GeoIP | |
| # | |
| def geoip_get_datadir(): | |
| try: | |
| return os.environ['GEOIP_ROOT'] + "/data/" | |
| except KeyError: | |
| return None | |
| def geoip_init(): | |
| try: | |
| import GeoIP | |
| file = geoip_get_datadir() + "GeoIP.dat" | |
| global geo | |
| geo = GeoIP.open (file, GeoIP.GEOIP_STANDARD) | |
| except: | |
| pass | |
| cprint ("Failed to import ~2GeoIP~0. No country info will be printed.\n") | |
| return | |
| if geo == None: | |
| cprint ("Failed to open ~2%s~0. No country info will be printed.\n" % file) | |
| def geoip_list_data(): | |
| try: | |
| import GeoIP | |
| head = " GeoIP version: %s" % GeoIP.lib_version() | |
| except: | |
| head = " GeoIP module not found" | |
| data_dir = geoip_get_datadir() | |
| if data_dir is None: | |
| cprint ("%s.\n Data-files in \"~2$GEOIP_ROOT/data/~0\" not found." % head) | |
| else: | |
| cprint ("%s.\n Data-files in ~2%s~0:" % (head, data_dir)) | |
| try: | |
| for f in sorted (os.listdir(data_dir)): | |
| if f.upper().endswith('.DAT'): | |
| st = os.stat (data_dir + f) | |
| size = nice_size (st.st_size) | |
| tim = time.strftime ('%Y-%m-%d %H:%M:%S', time.localtime(st.st_ctime)) | |
| cprint (' %-20s %12s %s' % (f, size, tim)) | |
| except WindowsError: | |
| cprint (' ~2<None>~0 (GeoIP.dat file not found).') | |
| except KeyError: | |
| cprint (' ~2<None>') | |
| def get_country (geo, ip): | |
| if not geo: | |
| return '' | |
| if have_ip2loc: | |
| global ip2loc | |
| ip2loc = geo.get_all(ip) | |
| if PY3: | |
| ip2loc.city = ip2loc.city.decode("utf-8") | |
| ip2loc.region = ip2loc.region.decode("utf-8") | |
| country = ip2loc.country_long or ip2loc.country_short | |
| else: | |
| country = geo.country_code_by_addr (ip) | |
| if PY3 and country: | |
| country = country.decode("utf-8") | |
| if not country or country == "-": | |
| return '' | |
| # Strip off at commas and hypens. | |
| # E.g. "Korea, Republic of" -> "Korea" | |
| # "Lao People's Democratic Republic" -> "Lao People" | |
| # | |
| c = country.find (",") | |
| if c > 1: | |
| return country[:c] | |
| c = country.find ("'") | |
| if c > 1: | |
| return country[:c] | |
| return country | |
| # | |
| # Interface for IP2Location | |
| # | |
| def ip2loc_get_datadir(): | |
| try: | |
| return os.environ['IP2LOC_ROOT'] + "/data/" | |
| except KeyError: | |
| return None | |
| def ip2loc_init(): | |
| _dir = ip2loc_get_datadir() | |
| if not _dir: | |
| cprint ("No ~2$IP2LOC_ROOT~0 is defined.") | |
| return False | |
| if 0: | |
| file = _dir + "IP-COUNTRY.BIN" | |
| else: | |
| file = _dir + "IP2LOCATION-LITE-DB9.IPV6.BIN" | |
| if not os.path.exists(file): | |
| cprint ("Failed to find ~2%s~0." % file) | |
| return False | |
| try: | |
| import IP2Location | |
| global geo | |
| geo = IP2Location.IP2Location (file) | |
| if not geo: | |
| cprint ("Failed to initialise IP2Location.") | |
| return False | |
| except: | |
| cprint ("Failed to import ~2IP2Location~0.") | |
| return False | |
| global have_ip2loc | |
| have_ip2loc = True | |
| return True | |
| def ip2loc_list_data(): | |
| try: | |
| import IP2Location | |
| intro = "" | |
| except: | |
| intro = " IP2Location module not found.\n" | |
| print (intro, end="") | |
| data_dir = ip2loc_get_datadir() | |
| if data_dir is None: | |
| cprint (" Data-files in \"~2$IP2LOC_ROOT/data/~0\" not found.") | |
| else: | |
| cprint (" Data-files in ~2%s~0:" % data_dir) | |
| try: | |
| for f in sorted (os.listdir(data_dir)): | |
| if f.upper().endswith(".BIN"): | |
| st = os.stat (data_dir + f) | |
| size = nice_size (st.st_size) | |
| tim = time.strftime ('%Y-%m-%d %H:%M:%S', time.localtime(st.st_ctime)) | |
| cprint (" %-30s %12s %s" % (f, size, tim)) | |
| except WindowsError: | |
| cprint (" ~2<None>~0 (IP2*.BIN files not found).") | |
| except KeyError: | |
| cprint (" ~2<None>") | |
| #################################################################################### | |
| def host_name (addr): | |
| if numeric: | |
| return addr | |
| name = str (addr) | |
| if name == "0.0.0.0": | |
| return name | |
| try: | |
| name, aliases, ipaddrs = gethostbyaddr (name) | |
| except socket.herror or socket.gaierror: | |
| pass | |
| except KeyboardInterrupt: | |
| cprint ("\nCtrl-C") | |
| sys.exit (1) | |
| # | |
| # Shorthen such long names: | |
| # a193-212-178-9.deploy.akamaitechnologies.com | |
| # into | |
| # ..178-9.deploy.akamaitechnologies.com | |
| # | |
| if len(name) > 33: | |
| return ".." + name[len(name)-33:] | |
| return name | |
| def addr_port (af, addr, port): | |
| if af == AF_INET6: | |
| return "[%s]:%s" % (addr, port) | |
| return "%s:%s" % (addr, port) | |
| loops = 0 | |
| def do_stop(): | |
| if continuous == 0: | |
| return 1 | |
| global loops | |
| loops += 1 | |
| if loops > 1: | |
| if only_this_program and only_this_program[0] != '!': | |
| num = " %d" % only_this_program_times [_THIS_LOOP] | |
| else: | |
| num = "" | |
| cprint ("----" + num) | |
| try: | |
| time.sleep (continuous_wait) | |
| except KeyboardInterrupt: | |
| cprint ("\nCtrl-C") | |
| sys.exit (1) | |
| return 0 | |
| def main(): | |
| if numeric: | |
| header = "~6%-5s %-22s %-22s %2.2s %-12s %-22s %s %s~0" | |
| templ = "~2%-5s~0 %-22s %-22s %2.2s ~6%-12s ~2%-20s ~0%5d %s" | |
| else: | |
| header = "~6Resolving addresses takes time, hang on...\n%-5s %-22s %-42s %2.2s %-12s %-22s %s %s~0" | |
| templ = "~2%-5s~0 %-22s %-42s %2.2s ~6%-12s ~2%-20s ~0%5d %s" | |
| if have_ip2loc: | |
| cprint (header % ("Proto", "Local addr", "Remote addr", "", "Status", "Program name", "Pid", "Country (City/Region)")) | |
| else: | |
| cprint (header % ("Proto", "Local addr", "Remote addr", "CC", "Status", "Program name", "Pid", "")) | |
| proc_names = {} | |
| quit = 0 | |
| ######################################################################### | |
| def handle_connection (c): | |
| if c.family not in family_accept: | |
| return False, None, None | |
| if not show_all and c.status not in ["ESTABLISHED", "TIME_WAIT"]: | |
| return False, None, None | |
| p_name = proc_names.get (c.pid, '?').lower() | |
| if only_this_program: | |
| if only_this_program[0] == '!': | |
| if p_name == only_this_program[1:]: | |
| return False, None, None | |
| elif p_name != only_this_program: | |
| return False, None, None | |
| protocol = proto_map [(c.family, c.type)] | |
| if protocol not in proto_accept: | |
| return False, None, None | |
| return True, p_name, protocol | |
| ######################################################################### | |
| def get_remote_info (c): | |
| raddr = None | |
| country = "" | |
| short_country = "" | |
| long_country = "" | |
| if c.raddr: | |
| ip, port = c.raddr | |
| raddr = addr_port (c.family, host_name(ip), port) | |
| country = get_country (geo, ip) | |
| if have_ip2loc: | |
| long_country = country | |
| else: | |
| short_country = country | |
| if have_ip2loc and long_country != "": | |
| if ip2loc.city and ip2loc.city != "-": | |
| long_country += " (" + ip2loc.city + ")" | |
| return raddr, short_country, long_country | |
| while not quit: | |
| quit = do_stop() | |
| global only_this_program_times | |
| only_this_program_times [_THIS_LOOP] = 0 | |
| for p in psutil.process_iter(): | |
| try: | |
| proc_names[p.pid] = p.name() | |
| except psutil.Error: | |
| pass | |
| except KeyboardInterrupt: | |
| cprint ("\nCtrl-C") | |
| sys.exit (1) | |
| for c in psutil.net_connections(kind='inet'): | |
| handle, p_name, protocol = handle_connection (c) | |
| if not handle: | |
| continue | |
| laddr = addr_port (c.family, c.laddr[0], c.laddr[1]) | |
| raddr, short_country, long_country = get_remote_info (c) | |
| cprint (templ % (protocol, laddr, raddr or AD, | |
| short_country, c.status, p_name, | |
| c.pid, long_country)) | |
| only_this_program_times [_TOTAL_TIMES] += 1 | |
| only_this_program_times [_THIS_LOOP] += 1 | |
| def final_report(): | |
| if only_this_program and only_this_program[0] != '!': | |
| print ("%s found %d times in connection-table" % \ | |
| (only_this_program, only_this_program_times[_TOTAL_TIMES])) | |
| def some_examples(): | |
| return r''' | |
| netstat -4tp chrome - show all connected IPv4 TCP ports from programs matching 'chrome'. | |
| netstat -uanp chrome - show all unconnected IPv4/IPv6 UDP ports from programs matching 'chrome'. | |
| netstat -4ncp !chrome - continuously show IPv4 addresses numerically from all programs not matching 'chrome' | |
| netstat -an|grep 'San Francisco' - show all ports for remote locations matching 'San Francisco' (needs IP2Location). | |
| ''' | |
| def usage (e = None, detailed = False): | |
| if e: | |
| print (e) | |
| print ("""Usage: %s [options] <program | !program> | |
| -h - show this help. | |
| -n - show address numerically. | |
| -c - show information every %d second continuously (^C to stop). | |
| -d - turn on some debug. | |
| -a - show all endpoints. | |
| -p - show the program (see below). | |
| -t - show only TCP endpoints. | |
| -u - show only UDP endpoints. | |
| -4 - show only IPv4 endpoints. | |
| -6 - show only IPv6 endpoints. | |
| If '-p' and 'program' is specified, show only endpoints for this 'program'. | |
| If '-p' and '!program' is specified, show only endpoints NOT for this 'program'. | |
| """ % (sys.argv[0], continuous_wait)) | |
| if detailed: | |
| if use_ip2loc: | |
| ip2loc_list_data() | |
| else: | |
| geoip_list_data() | |
| print ("\n Examples: %s" % some_examples(), end="") | |
| sys.exit (0) | |
| def parse_cmd_line(): | |
| do_help = 0 | |
| try: | |
| opts, args = getopt.getopt (sys.argv[1:], "hncdaptu46") | |
| except getopt.GetoptError as e: | |
| usage ("Error: %s" % e.msg) | |
| for o, a in opts: | |
| if o == "-h": | |
| do_help = 1 | |
| break | |
| elif o == "-n": | |
| global numeric | |
| numeric = 1 | |
| elif o == "-c": | |
| global continuous | |
| continuous = 1 | |
| elif o == "-a": | |
| global show_all | |
| show_all = True | |
| elif o == "-p": | |
| global show_program | |
| show_program = True | |
| elif o == "-t": | |
| proto_accept.remove ("udp") | |
| proto_accept.remove ("udp6") | |
| elif o == "-u": | |
| proto_accept.remove ("tcp") | |
| proto_accept.remove ("tcp6") | |
| elif o == '-d': | |
| global debug | |
| debug = color_print.TRACE = 1 | |
| elif o == '-4': | |
| family_accept.remove (AF_INET6) | |
| elif o == '-6': | |
| family_accept.remove (AF_INET) | |
| color_print.init_console() | |
| if not sys.stdout.isatty(): | |
| color_print.trace ("isatty() == 0; Cannot use Windows Console.") | |
| if do_help: | |
| usage (detailed = True) | |
| if args: | |
| if not show_program: | |
| print ("Ignoring the remaining cmd-line. Did you forget the use the '-p' option?") | |
| else: | |
| global only_this_program | |
| only_this_program = args[0].lower() | |
| if not only_this_program.endswith(".exe"): | |
| only_this_program += ".exe" | |
| def init(): | |
| if use_ip2loc: | |
| if not ip2loc_init(): | |
| cprint ("Trying GeoIP.") | |
| if not have_ip2loc: | |
| geoip_init() | |
| parse_cmd_line() | |
| init() | |
| main() | |
| final_report() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment