Last active
June 22, 2021 01:26
-
-
Save toxicantidote/9d89ad6d9003bbc67ed1a7a6289eac9d to your computer and use it in GitHub Desktop.
Network host ping tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Site ping check tool ## | |
## | |
## Originally developed and tested on Win10 with Python 2.7. Updates made with | |
## Python 3.7, but should still work on Python 2.x provided the Tk install | |
## isn't ancient. | |
## | |
## Reads one or more files containing details of sites to ping, pings them at | |
## the defined frequency, and displays the result in a GUI. | |
## | |
## Within the host files, the last word on each line is considered to be the | |
## target, and the other words on the line are the name displayed in the GUI. | |
## It is intended that this will allow the use of local hostnames, FQDNs and | |
## IPs for targets. Lines that are blank or start with # are ignored, allowing | |
## for neat formatting and comments. | |
## | |
## List of files with hosts to ping | |
host_lists = ['important hosts.txt', 'some other hosts.txt'] | |
## How often to wait between pinging them all (seconds) | |
frequency = 30 | |
## How many hosts to ping concurrently | |
threads = 20 | |
## Timeout for pings (seconds) | |
timeout = 1 | |
### | |
### NO EDITING BELOW HERE ### | |
import sys | |
import subprocess | |
import threading | |
from multiprocessing.pool import ThreadPool | |
import re | |
import time | |
import datetime | |
if sys.version_info[0] < 3: | |
import Queue as queue | |
import Tkinter as tkinter | |
import tkMessageBox as messagebox | |
import ttk | |
else: | |
import queue | |
import tkinter | |
import tkinter.messagebox as messagebox | |
import tkinter.ttk as ttk | |
### | |
class ui(): | |
## called when the window is closed | |
def destroy(self): | |
print('Exiting!') | |
self.terminate = True | |
self.pinger.terminate = True | |
self.root.destroy() | |
## called when the class is initialised | |
def __init__(self): | |
global host_lists, frequency | |
self.terminate = False | |
self.pinger = pinger(self) | |
self.pinger.start() | |
self.update_queue = queue.Queue() | |
## basic ui config | |
self.root = tkinter.Tk() | |
self.root.protocol('WM_DELETE_WINDOW', self.destroy) | |
self.root.resizable(0,0) | |
self.root.title('Site ping tool') | |
## create icons | |
icon_b64_title = 'R0lGODlhGAAYAIABAAAAAP///yH5BAEKAAEALAAAAAAYABgAAAJPjI+py+0GonxJ2vms0gz3+CEcBCwgeZXkpnoBdh5xLKuvfa96zusn98MFUa2iyHgb9nrFX62CS0ZpOx6NulMuj09mVFR1Uq7fhstFSaspBQA7' | |
icon_b64_ok = 'R0lGODlhFAAUAIABAAD+AP///yH5BAEKAAEALAAAAAAUABQAAAIyjB+gi30LmUuxqmYzQNq+Ll0UaDCgeEZltkItO26xtr7kx1a2Oqe4/zthgI7OZOhyFAAAOw==' | |
icon_b64_alert = 'R0lGODlhFAAUAIABAP4AAP///yH5BAEKAAEALAAAAAAUABQAAAIyjI+pGwAM24vKOZrsxFLzbnGaR4VdaZIMqVZsi4yG7L4weOHbPPatD9wAeT0ibRhMAgoAOw==' | |
icon_b64_unknown = 'R0lGODlhFAAUAIABAP7+AP///yH5BAEKAAEALAAAAAAUABQAAAInDI6paOsP4wtNMootwLxCmoCSeJBa6WnZuZnWyrrsjNKtLdv6kqoFADs=' | |
self.icon_title = tkinter.PhotoImage(data = icon_b64_title) | |
self.icon_ok = tkinter.PhotoImage(data = icon_b64_ok) | |
self.icon_alert = tkinter.PhotoImage(data = icon_b64_alert) | |
self.icon_unknown = tkinter.PhotoImage(data = icon_b64_unknown) | |
## window icon | |
self.root.tk.call('wm', 'iconphoto', self.root._w, self.icon_title) | |
## widget object storage for later reference | |
self.nb_widgets = dict() | |
self.ip_widgets = dict() | |
## progress bar for next ping | |
self.progress_bar = ttk.Progressbar(self.root, orient = 'horizontal', length = 100, mode = 'determinate', max = frequency) | |
self.progress_text = tkinter.Label(self.root, text= 'Loading hosts..') | |
self.progress_bar.grid(row = 0, column = 0, sticky = 'ew') | |
self.progress_text.grid(row = 0, column = 1, sticky = 'w') | |
self.progress_bar.start() | |
## notebook | |
self.nb = ttk.Notebook(self.root) | |
self.nb.grid(row = 1, column = 0, columnspan = 2, sticky = 'news') | |
self.root.update() | |
## make a tab for each file | |
self.all_hosts = [] | |
tab_id = 0 | |
for fname in host_lists: | |
## get a list of hosts | |
hosts = [] | |
try: | |
with open(fname, 'r') as hfile: | |
content = hfile.read().splitlines() | |
except: | |
messagebox.showerror('Unable to read host list', 'An error occurred while reading ' + fname + '\nDoes the file exist and can it be read by the current user?') | |
continue | |
for line in content: | |
re_target = re.search(r'^([^#].+)\s(\S+)$', line) | |
if re_target: | |
hosts.append([re_target.group(1), re_target.group(2)]) | |
self.all_hosts.append(re_target.group(2)) | |
## move to the next file if no hosts were found | |
if len(hosts) < 1: | |
print('No hosts in ' + fname) | |
messagebox.showwarning('No hosts found', 'No host entries were found while reading ' + fname + '\n\nThis file has been ignored') | |
continue | |
else: | |
print('Found ' + str(len(hosts)) + ' hosts in ' + fname) | |
## make the tab for this file | |
nb_frame = tkinter.Frame(self.nb) | |
clean_name = re.sub(r'\_', ' ', fname) | |
clean_name = re.sub(r'\..+$', '', clean_name) | |
self.nb.add(nb_frame, text = clean_name.capitalize(), image = self.icon_unknown, compound = tkinter.TOP) | |
self.nb_widgets[fname] = tab_id | |
tab_id += 1 | |
## make the gui elements for each host | |
row = 0 | |
for name, address in hosts: | |
self.ip_widgets[address] = [None, None, None] | |
self.ip_widgets[address][0] = tkinter.Label(nb_frame, text = name, background = 'light yellow', anchor = 'w', padx = 10) | |
self.ip_widgets[address][1] = tkinter.Label(nb_frame, text = 'No replies seen', width = 22, anchor = 'w', padx = 10) | |
self.ip_widgets[address][0].grid(row = row, column = 0, sticky = 'ew') | |
self.ip_widgets[address][1].grid(row = row, column = 1, sticky = 'ew') | |
self.ip_widgets[address][2] = self.nb_widgets[fname] | |
row += 1 | |
if len(self.all_hosts) < 1: | |
messagebox.showerror('No hosts found', 'No hosts were found in any available host list file') | |
def run(self): | |
self.root.after(100, self.do_ping) | |
self.root.mainloop() | |
def do_ping(self): | |
self.root.update() | |
initial_run = True | |
while self.terminate == False: | |
host_list = self.get_hosts() | |
self.pinger.start_ping(host_list) | |
self.progress_bar.config(mode = 'indeterminate') | |
self.progress_text.config(text = 'Waiting for replies...') | |
self.progress_bar.start() | |
while self.pinger.active == True: | |
if self._update_times() != False: | |
self.update_tabs() | |
self.root.update() | |
time.sleep(0.5) | |
end = time.time() + frequency | |
self.progress_bar.stop() | |
self.progress_bar.config(max = frequency, mode = 'determinate') | |
while time.time() < end: | |
try: | |
## accept ping data any time.. | |
if self._update_times() != False: | |
self.update_tabs() | |
remaining = int((end - time.time()) + 1) | |
self.progress_bar.config(value = (frequency - remaining)) | |
remaining_text = str(datetime.timedelta(seconds = remaining)) | |
self.progress_text.config(text = 'Next ping in ' + remaining_text) | |
self.root.update() | |
time.sleep(0.5) | |
## if the window is closed during this we get an exception | |
except: | |
return | |
def get_hosts(self): | |
return self.all_hosts | |
def update_time(self, host, colour, message = None): | |
if self.terminate == False: | |
self.update_queue.put([host, colour, message]) | |
## update last ping time for a host | |
def _update_times(self): | |
while self.update_queue.empty() == False: | |
try: | |
host, colour, message = self.update_queue.get_nowait() | |
except queue.Empty: | |
return False | |
try: | |
self.ip_widgets[host][0].configure(background = colour) | |
if message != None: | |
self.ip_widgets[host][1].configure(text = str(message)) | |
except: | |
print('Error updating time for ' + host) | |
return True | |
## update tab icons to reflect the current state | |
def update_tabs(self, event = None): | |
bad = unknown = [] | |
## find unknown and bad results | |
for entry in self.ip_widgets.keys(): | |
if self.ip_widgets[entry][0]['background'] == 'light yellow': | |
if not self.ip_widgets[entry][2] in unknown: | |
unknown.append(self.ip_widgets[entry][2]) | |
elif self.ip_widgets[entry][0]['background'] == 'coral1': | |
if not self.ip_widgets[entry][2] in bad: | |
bad.append(self.ip_widgets[entry][2]) | |
## update tab colours | |
for entry in self.nb_widgets.keys(): | |
if self.nb_widgets[entry] in bad: | |
self.nb.tab(self.nb_widgets[entry], image = self.icon_alert) | |
elif self.nb_widgets[entry] in unknown: | |
self.nb.tab(self.nb_widgets[entry], image = self.icon_unknown) | |
else: | |
self.nb.tab(self.nb_widgets[entry], image = self.icon_ok) | |
class pinger(threading.Thread): | |
def __init__(self, gui): | |
## initialise self as a thread | |
threading.Thread.__init__(self) | |
self.name = 'Ping-controller' | |
self.gui = gui | |
self.pq = queue.Queue() | |
self.terminate = False | |
self.active = False | |
def __del__(self): | |
self.terminate = True | |
def run(self): | |
global threads | |
## make a thread pool | |
for i in range(threads): | |
t = threading.Thread(target = self.ping, name = 'Ping-worker-' + str(i)) | |
t.setDaemon(True) | |
t.start() | |
while self.terminate == False: | |
if self.pq.empty() == False: | |
self.pq.join() | |
## sleep here to reduce CPU usage from polling queue status | |
time.sleep(0.1) | |
self.active = False | |
def queue_size(self): | |
return self.pq.qsize() | |
def start_ping(self, targets): | |
if self.active == True: | |
print('ERROR: Not starting new ping. Existing pings in queue') | |
return | |
self.active = True | |
## push the queue to the workers | |
for host in targets: | |
self.pq.put(host) | |
def ping(self): | |
global timeout | |
while self.terminate == False: | |
target = self.pq.get() | |
try: | |
if sys.platform == 'win32': | |
output = subprocess.check_output('ping -n 1 -w ' + str(timeout * 1000) + ' ' + target, shell = True, stderr=subprocess.STDOUT) | |
else: | |
output = subprocess.check_output('ping -c 1 -W ' + str(timeout) + ' ' + target, shell = True, stderr=subprocess.STDOUT) | |
except subprocess.CalledProcessError: | |
e = sys.exc_info()[1] | |
output = e.output | |
## Check the output | |
output = str(output) | |
re_ok = re.search(r'\(0%( packet)? loss\)', output) | |
if sys.platform == 'win32': | |
re_time = re.search(r'Average = (\d+)ms', output) | |
else: | |
re_time = re.search(r'rtt min\/avg.+=\s[\d\.]+\/([\d\.]+)\/', output) | |
## we use queues for output to avoid race conditions | |
stamp = '\n[' + datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S') + '] ' | |
if re_ok and re_time: | |
print(stamp + 'Reply from ' + target + ' (' + str(re_time.group(1)) + 'ms)') | |
self.gui.update_time(target, 'OliveDrab1', datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S') + ' (' + str(re_time.group(1)) + 'ms)') | |
else: | |
print(stamp + 'NO REPLY FROM ' + target) | |
self.gui.update_time(target, 'coral1') | |
self.pq.task_done() | |
## start here | |
gui = ui() | |
gui.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Needs some more work to properly have the pinger thread use the tkinter events system.