Last active
February 4, 2017 14:29
-
-
Save creamidea/f18b8a959d52a4b99064e9f02ea3991b to your computer and use it in GitHub Desktop.
测试 IP 联通质量
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python3 | |
#-*- encoding:utf-8 -*- | |
from operator import itemgetter | |
from queue import Queue | |
from threading import Thread | |
from subprocess import STDOUT, check_output, CalledProcessError | |
from collections import OrderedDict | |
import re | |
class PingTask(Thread): | |
p0 = re.compile(r'--- (?P<target>([0-9.]+?)) ping statistics ---') | |
p1 = re.compile(r'(?P<transmitted>\d+?) packets transmitted, (?P<received>\d+?) packets received, (?P<loss>\d+?.\d+?)% packet loss') | |
p2 = re.compile(r'round-trip min/avg/max/stddev = (?P<min>\d+?.\d+?)/(?P<avg>\d+?.\d+?)/(?P<max>\d+?.\d+?)/(?P<stddev>\d+?.\d+?) ms') | |
def __init__(self, tasks): | |
super(PingTask, self).__init__() | |
self.tasks = tasks | |
self.daemon = True | |
self.start() | |
def convert(self, value): | |
try: | |
value.index('.') >= 0 | |
value = float(value) | |
except ValueError: | |
value = int(value) | |
finally: | |
return value | |
def _getTarget(self, retval): | |
g0 = self.p0.search(retval) | |
return {"target": g0.group('target')} | |
def _getLoss(self, retval): | |
g1 = self.p1.search(retval) | |
info = {} | |
for name in ['transmitted', 'received', 'loss']: | |
value = g1.group(name) | |
info[name] = self.convert(value) | |
return info | |
def _getSpeed(self, retval): | |
g2 = self.p2.search(retval) | |
info = {} | |
for name in ['min', 'avg', 'max', 'stddev']: | |
value = g2.group(name) | |
info[name] = self.convert(value) | |
return info | |
def parse(self, retval): | |
info = {} | |
info.update(self._getTarget(retval)) | |
info.update(self._getLoss(retval)) | |
info.update(self._getSpeed(retval)) | |
return info | |
def run(self): | |
ip, results = self.tasks.get() | |
try: | |
print('>>> Start ping {ip}...'.format(ip=ip)) | |
retval = check_output( | |
["ping", "-c20", "-q", "-i0.1", "-W1", ip], stderr=STDOUT | |
) | |
except CalledProcessError as e: | |
out_bytes = e.output # Output generated before error | |
code = e.returncode # Return code | |
results.put({"target": ip, "code": code, "message": out_bytes}) | |
else: | |
retval = retval.decode('utf-8') | |
# print('bbbb>>> ', retval) | |
results.put(self.parse(retval)) | |
finally: | |
self.tasks.task_done() | |
def ReadIPs(filename = 'ips.txt'): | |
with open(filename, 'rb') as f: | |
targets = f.read() | |
targets = targets.decode('utf-8').split('\n') | |
yield targets | |
def handleResults(results): | |
l = [x for x in list(results.queue) if "code" not in x] | |
l.sort(key=itemgetter('loss', 'avg', 'min', 'max')) | |
printInfo(l) | |
def printInfo(results): | |
for idx, info in enumerate(results): | |
print("{index}: [{loss}]\t[{avg}]\t[{min}]\t[{max}]:\t<{target}>".format( | |
index = idx, | |
target = info["target"], | |
loss = info["loss"], | |
avg = info["avg"], | |
min = info["min"], | |
max = info["max"] | |
)) | |
if __name__ == '__main__': | |
pools = ["192.168.1.100", "192.168.1.1"] | |
alive = [] | |
filename = 'ips.txt' | |
ips = ReadIPs(filename) | |
targets = (next(ips)) | |
#targets = pools | |
count = len(targets) | |
tasks = Queue(count) | |
results = Queue(count) | |
for i in range(count): | |
tasks.put([targets[i], results]) | |
_task = PingTask(tasks) | |
# _task.join() | |
tasks.join() | |
handleResults(results) | |
# a = ''' | |
# --- 111.206.223.205 ping statistics --- | |
# 11 packets transmitted, 11 packets received, 36.4% packet loss | |
# round-trip min/avg/max/stddev = 2325.497/3858.896/5928.119/1244.209 ms | |
# ''' | |
# print(a) | |
# b = "100 packets transmitted" | |
# print(p1.search(a).groups()) | |
# print(p2.search(a).groups()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment