Created
April 29, 2016 09:59
-
-
Save standa/ae5fc16b508b01fdc86630a587019e85 to your computer and use it in GitHub Desktop.
Infinite loop internet speed test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# download speedtest python script | |
# wget https://raw.githubusercontent.com/rsvp/speedtest-linux/master/speedtest | |
# wget https://raw.githubusercontent.com/sivel/speedtest-cli/master/speedtest_cli.py | |
while true; do ./speedtest --log; sleep 900; done |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
# | |
# downloaded from https://github.com/rsvp/speedtest-linux | |
# | |
# bash 4.3.11(1) Linux Ubuntu 14.04.1 Date : 2016-01-18 | |
# | |
# _______________| speedtest : ping, download and upload speeds. | |
# Commandline using speedtest.net resources. | |
# This standalone script is a wrapper. | |
# | |
# Usage: speedtest [--log|--simple|--verbose|--list|--fav|--version] | |
# | |
# Takes about a minute for results to appear. | |
# For logfile, directory variable $logdir should be modified. | |
# | |
# Examples: $ speedtest --simple | |
# Ping: 22.602 ms | |
# Download: 0.62 Mbit/s | |
# Upload: 0.25 Mbit/s | |
# | |
# $ speedtest # No args for single line timestamped. | |
# 2015-03-13, 19:25, 22.602, 0.62, 0.25 | |
# | |
# $ speedtest --log # Will cat logfile with latest result. | |
# | |
# $ speedtest --log tmp.log # else default: speedtest.log | |
# | |
# Dependencies: curl [ Used to download the following Python script: ] | |
# speedtest-cli.py ( https://github.com/sivel/speedtest-cli ) | |
# ^tested on Python versions 2.4 through 3.4. | |
# CHANGE LOG ORIGIN: https://github.com/rsvp/speedtest-linux | |
# 2016-01-18 Fix issue #2 by using $HOME instead of /home/${USER} for logdir. | |
# @oeolartep: $USER was unbound variable in cron jobs. | |
# 2015-03-19 Second argument names log file. Directory's existence tested. | |
# 2015-03-17 Clarify some comments. | |
# 2015-03-13 This script ALWAYS retrieves the latest dependent code. | |
# Currently it is version 0.3.2 from the primary source. | |
# _____ PREAMBLE_v3: settings, variables, and error handling. | |
# | |
LC_ALL=POSIX | |
# locale means "ASCII, US English, no special rules, | |
# output per ISO and RFC standards." | |
# Esp. use ASCII encoding for glob and sorting characters. | |
shopt -s extglob | |
# ^set extended glob for pattern matching. | |
shopt -s failglob | |
# ^failed pattern matching signals error. | |
set -e | |
# ^errors checked: immediate exit if a command has non-zero status. | |
set -o pipefail | |
# ^exit status on fail within pipe, not (default) last command. | |
set -u | |
# ^unassigned variables shall be errors. | |
# Example of default VARIABLE ASSIGNMENT: arg1=${1:-'foo'} | |
arg1=${1:-'NULL'} | |
arg2=${2:-'speedtest.log'} | |
# ^default name for logfile, see $logf below. | |
program=${0##*/} # similar to using basename | |
memf=$( mktemp /dev/shm/88_${program}_tmp.XXXXXXXXXX ) | |
errf=$( mktemp /dev/shm/88_${program}_tmp.XXXXXXXXXX ) | |
cleanup () { | |
# Delete temporary files, then optionally exit given status. | |
local status=${1:-'0'} | |
rm -f $memf $errf | |
[ $status = '-1' ] || exit $status # thus -1 prevents exit. | |
} #-------------------------------------------------------------------- | |
warn () { | |
# Message with basename to stderr. Usage: warn "message" | |
echo -e "\n !! ${program}: $1 " >&2 | |
} #-------------------------------------------------------------------- | |
die () { | |
# Exit with status of most recent command or custom status, after | |
# cleanup and warn. Usage: command || die "message" [status] | |
local status=${2:-"$?"} | |
cat $errf >&2 | |
cleanup -1 && warn "$1" && exit $status | |
} #-------------------------------------------------------------------- | |
trap "die 'SIG disruption: but finish needs about one minute.' 114" 1 2 3 15 | |
# Cleanup after INTERRUPT: 1=SIGHUP, 2=SIGINT, 3=SIGQUIT, 15=SIGTERM | |
trap "die 'unhandled ERR via trap, but cleanup finished.' 116" ERR | |
# Cleanup after command failure unless it's part of a test clause. | |
# | |
# _______________ :: BEGIN Script :::::::::::::::::::::::::::::::::::::::: | |
favorite='1335' | |
# ^DreamHost in Los Angeles for --fav option. | |
# Use --list to find your favorite. | |
logdir="$HOME/var/log/net" | |
# ^RENAME log directory for your personal use; for --log option. | |
[ -d "$logdir" ] || mkdir -p "$logdir" || die "fail creating $logdir" 117 | |
# Check directory's existence, else create it. | |
logf="$logdir/$arg2" | |
# Suggested first line HEADER for CSV: Date, Time, Ping, Down, Up | |
# Relying on a STATIC LOCAL VERSION may get outdated, so | |
# DOWNLOAD and execute LATEST PRIMARY VERSION of speedtest-cli.py: | |
source='https://raw.githubusercontent.com/sivel/speedtest-cli/master/speedtest_cli.py' | |
# If @sivel disappears for any reason, try mirror @rsvp. | |
# | |
# Retrieve source and place it in memory with execute permission: | |
curl -kL --silent $source > $memf \ | |
|| die "curl unable to retrieve $program source code." 113 | |
chmod 755 $memf | |
timestamped () { | |
# Get time, then --simple in one line, finally combine into CSV format. | |
epoch="$( date '+%Y-%m-%d, %H:%M' )" | |
speeds="$( echo $( $memf --simple | sed -e 's/^.*: //' -e 's/ .*s$/, /' ))" | |
# ^on a single line: ping, download, upload speeds -- given output: | |
# Ping: 22.602 ms | |
# Download: 0.62 Mbit/s | |
# Upload: 0.25 Mbit/s | |
echo "$epoch, $speeds" | sed -e 's/,$//' | |
# Sample result: "2015-03-13, 19:25, 22.602, 0.62, 0.25" | |
} | |
case "$arg1" in | |
NULL) timestamped ;; | |
--log) timestamped >> "$logf" && cat "$logf" ;; | |
--simple) $memf --simple ;; | |
# Not specifying favorite server lets the program | |
# pick the server with lowest latency. | |
--fav) $memf --simple --server $favorite ;; | |
--verbose) $memf ;; | |
--version) $memf --version || true ;; | |
--list) $memf --list ;; | |
# list of worldwide servers. | |
--default) echo "Default log directory: $logdir " | |
echo "Default log file: $arg2 " | |
echo "--fav test server: $favorite" ;; | |
*) die "undefined arg: $arg1" 115 ;; | |
esac | |
cleanup # Instead of: trap arg EXIT | |
# _______________ EOS :: END of Script :::::::::::::::::::::::::::::::::::::::: | |
# vim: set fileencoding=utf-8 ff=unix tw=78 ai syn=sh : |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# downloaded from https://github.com/sivel/speedtest-cli | |
# | |
# -*- coding: utf-8 -*- | |
# Copyright 2012-2015 Matt Martz | |
# All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); you may | |
# not use this file except in compliance with the License. You may obtain | |
# a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
# License for the specific language governing permissions and limitations | |
# under the License. | |
import os | |
import re | |
import sys | |
import math | |
import signal | |
import socket | |
import timeit | |
import platform | |
import threading | |
__version__ = '0.3.4' | |
# Some global variables we use | |
user_agent = None | |
source = None | |
shutdown_event = None | |
scheme = 'http' | |
# Used for bound_interface | |
socket_socket = socket.socket | |
try: | |
import xml.etree.cElementTree as ET | |
except ImportError: | |
try: | |
import xml.etree.ElementTree as ET | |
except ImportError: | |
from xml.dom import minidom as DOM | |
ET = None | |
# Begin import game to handle Python 2 and Python 3 | |
try: | |
from urllib2 import urlopen, Request, HTTPError, URLError | |
except ImportError: | |
from urllib.request import urlopen, Request, HTTPError, URLError | |
try: | |
from httplib import HTTPConnection, HTTPSConnection | |
except ImportError: | |
e_http_py2 = sys.exc_info() | |
try: | |
from http.client import HTTPConnection, HTTPSConnection | |
except ImportError: | |
e_http_py3 = sys.exc_info() | |
raise SystemExit('Your python installation is missing required HTTP ' | |
'client classes:\n\n' | |
'Python 2: %s\n' | |
'Python 3: %s' % (e_http_py2[1], e_http_py3[1])) | |
try: | |
from Queue import Queue | |
except ImportError: | |
from queue import Queue | |
try: | |
from urlparse import urlparse | |
except ImportError: | |
from urllib.parse import urlparse | |
try: | |
from urlparse import parse_qs | |
except ImportError: | |
try: | |
from urllib.parse import parse_qs | |
except ImportError: | |
from cgi import parse_qs | |
try: | |
from hashlib import md5 | |
except ImportError: | |
from md5 import md5 | |
try: | |
from argparse import ArgumentParser as ArgParser | |
except ImportError: | |
from optparse import OptionParser as ArgParser | |
try: | |
import builtins | |
except ImportError: | |
def print_(*args, **kwargs): | |
"""The new-style print function taken from | |
https://pypi.python.org/pypi/six/ | |
""" | |
fp = kwargs.pop("file", sys.stdout) | |
if fp is None: | |
return | |
def write(data): | |
if not isinstance(data, basestring): | |
data = str(data) | |
fp.write(data) | |
want_unicode = False | |
sep = kwargs.pop("sep", None) | |
if sep is not None: | |
if isinstance(sep, unicode): | |
want_unicode = True | |
elif not isinstance(sep, str): | |
raise TypeError("sep must be None or a string") | |
end = kwargs.pop("end", None) | |
if end is not None: | |
if isinstance(end, unicode): | |
want_unicode = True | |
elif not isinstance(end, str): | |
raise TypeError("end must be None or a string") | |
if kwargs: | |
raise TypeError("invalid keyword arguments to print()") | |
if not want_unicode: | |
for arg in args: | |
if isinstance(arg, unicode): | |
want_unicode = True | |
break | |
if want_unicode: | |
newline = unicode("\n") | |
space = unicode(" ") | |
else: | |
newline = "\n" | |
space = " " | |
if sep is None: | |
sep = space | |
if end is None: | |
end = newline | |
for i, arg in enumerate(args): | |
if i: | |
write(sep) | |
write(arg) | |
write(end) | |
else: | |
print_ = getattr(builtins, 'print') | |
del builtins | |
class SpeedtestCliServerListError(Exception): | |
"""Internal Exception class used to indicate to move on to the next | |
URL for retrieving speedtest.net server details | |
""" | |
def bound_socket(*args, **kwargs): | |
"""Bind socket to a specified source IP address""" | |
global source | |
sock = socket_socket(*args, **kwargs) | |
sock.bind((source, 0)) | |
return sock | |
def distance(origin, destination): | |
"""Determine distance between 2 sets of [lat,lon] in km""" | |
lat1, lon1 = origin | |
lat2, lon2 = destination | |
radius = 6371 # km | |
dlat = math.radians(lat2 - lat1) | |
dlon = math.radians(lon2 - lon1) | |
a = (math.sin(dlat / 2) * math.sin(dlat / 2) + | |
math.cos(math.radians(lat1)) * | |
math.cos(math.radians(lat2)) * math.sin(dlon / 2) * | |
math.sin(dlon / 2)) | |
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) | |
d = radius * c | |
return d | |
def build_user_agent(): | |
"""Build a Mozilla/5.0 compatible User-Agent string""" | |
global user_agent | |
if user_agent: | |
return user_agent | |
ua_tuple = ( | |
'Mozilla/5.0', | |
'(%s; U; %s; en-us)' % (platform.system(), platform.architecture()[0]), | |
'Python/%s' % platform.python_version(), | |
'(KHTML, like Gecko)', | |
'speedtest-cli/%s' % __version__ | |
) | |
user_agent = ' '.join(ua_tuple) | |
return user_agent | |
def build_request(url, data=None, headers={}): | |
"""Build a urllib2 request object | |
This function automatically adds a User-Agent header to all requests | |
""" | |
if url[0] == ':': | |
schemed_url = '%s%s' % (scheme, url) | |
else: | |
schemed_url = url | |
headers['User-Agent'] = user_agent | |
return Request(schemed_url, data=data, headers=headers) | |
def catch_request(request): | |
"""Helper function to catch common exceptions encountered when | |
establishing a connection with a HTTP/HTTPS request | |
""" | |
try: | |
uh = urlopen(request) | |
return uh, False | |
except (HTTPError, URLError, socket.error): | |
e = sys.exc_info()[1] | |
return None, e | |
class FileGetter(threading.Thread): | |
"""Thread class for retrieving a URL""" | |
def __init__(self, url, start): | |
self.url = url | |
self.result = None | |
self.starttime = start | |
threading.Thread.__init__(self) | |
def run(self): | |
self.result = [0] | |
try: | |
if (timeit.default_timer() - self.starttime) <= 10: | |
request = build_request(self.url) | |
f = urlopen(request) | |
while 1 and not shutdown_event.isSet(): | |
self.result.append(len(f.read(10240))) | |
if self.result[-1] == 0: | |
break | |
f.close() | |
except IOError: | |
pass | |
def downloadSpeed(files, quiet=False): | |
"""Function to launch FileGetter threads and calculate download speeds""" | |
start = timeit.default_timer() | |
def producer(q, files): | |
for file in files: | |
thread = FileGetter(file, start) | |
thread.start() | |
q.put(thread, True) | |
if not quiet and not shutdown_event.isSet(): | |
sys.stdout.write('.') | |
sys.stdout.flush() | |
finished = [] | |
def consumer(q, total_files): | |
while len(finished) < total_files: | |
thread = q.get(True) | |
while thread.isAlive(): | |
thread.join(timeout=0.1) | |
finished.append(sum(thread.result)) | |
del thread | |
q = Queue(6) | |
prod_thread = threading.Thread(target=producer, args=(q, files)) | |
cons_thread = threading.Thread(target=consumer, args=(q, len(files))) | |
start = timeit.default_timer() | |
prod_thread.start() | |
cons_thread.start() | |
while prod_thread.isAlive(): | |
prod_thread.join(timeout=0.1) | |
while cons_thread.isAlive(): | |
cons_thread.join(timeout=0.1) | |
return (sum(finished) / (timeit.default_timer() - start)) | |
class FilePutter(threading.Thread): | |
"""Thread class for putting a URL""" | |
def __init__(self, url, start, size): | |
self.url = url | |
chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ' | |
data = chars * (int(round(int(size) / 36.0))) | |
self.data = ('content1=%s' % data[0:int(size) - 9]).encode() | |
del data | |
self.result = None | |
self.starttime = start | |
threading.Thread.__init__(self) | |
def run(self): | |
try: | |
if ((timeit.default_timer() - self.starttime) <= 10 and | |
not shutdown_event.isSet()): | |
request = build_request(self.url, data=self.data) | |
f = urlopen(request) | |
f.read(11) | |
f.close() | |
self.result = len(self.data) | |
else: | |
self.result = 0 | |
except IOError: | |
self.result = 0 | |
def uploadSpeed(url, sizes, quiet=False): | |
"""Function to launch FilePutter threads and calculate upload speeds""" | |
start = timeit.default_timer() | |
def producer(q, sizes): | |
for size in sizes: | |
thread = FilePutter(url, start, size) | |
thread.start() | |
q.put(thread, True) | |
if not quiet and not shutdown_event.isSet(): | |
sys.stdout.write('.') | |
sys.stdout.flush() | |
finished = [] | |
def consumer(q, total_sizes): | |
while len(finished) < total_sizes: | |
thread = q.get(True) | |
while thread.isAlive(): | |
thread.join(timeout=0.1) | |
finished.append(thread.result) | |
del thread | |
q = Queue(6) | |
prod_thread = threading.Thread(target=producer, args=(q, sizes)) | |
cons_thread = threading.Thread(target=consumer, args=(q, len(sizes))) | |
start = timeit.default_timer() | |
prod_thread.start() | |
cons_thread.start() | |
while prod_thread.isAlive(): | |
prod_thread.join(timeout=0.1) | |
while cons_thread.isAlive(): | |
cons_thread.join(timeout=0.1) | |
return (sum(finished) / (timeit.default_timer() - start)) | |
def getAttributesByTagName(dom, tagName): | |
"""Retrieve an attribute from an XML document and return it in a | |
consistent format | |
Only used with xml.dom.minidom, which is likely only to be used | |
with python versions older than 2.5 | |
""" | |
elem = dom.getElementsByTagName(tagName)[0] | |
return dict(list(elem.attributes.items())) | |
def getConfig(): | |
"""Download the speedtest.net configuration and return only the data | |
we are interested in | |
""" | |
request = build_request('://www.speedtest.net/speedtest-config.php') | |
uh, e = catch_request(request) | |
if e: | |
print_('Could not retrieve speedtest.net configuration: %s' % e) | |
sys.exit(1) | |
configxml = [] | |
while 1: | |
configxml.append(uh.read(10240)) | |
if len(configxml[-1]) == 0: | |
break | |
if int(uh.code) != 200: | |
return None | |
uh.close() | |
try: | |
try: | |
root = ET.fromstring(''.encode().join(configxml)) | |
config = { | |
'client': root.find('client').attrib, | |
'times': root.find('times').attrib, | |
'download': root.find('download').attrib, | |
'upload': root.find('upload').attrib} | |
except AttributeError: # Python3 branch | |
root = DOM.parseString(''.join(configxml)) | |
config = { | |
'client': getAttributesByTagName(root, 'client'), | |
'times': getAttributesByTagName(root, 'times'), | |
'download': getAttributesByTagName(root, 'download'), | |
'upload': getAttributesByTagName(root, 'upload')} | |
except SyntaxError: | |
print_('Failed to parse speedtest.net configuration') | |
sys.exit(1) | |
del root | |
del configxml | |
return config | |
def closestServers(client, all=False): | |
"""Determine the 5 closest speedtest.net servers based on geographic | |
distance | |
""" | |
urls = [ | |
'://www.speedtest.net/speedtest-servers-static.php', | |
'://c.speedtest.net/speedtest-servers-static.php', | |
'://www.speedtest.net/speedtest-servers.php', | |
'://c.speedtest.net/speedtest-servers.php', | |
] | |
errors = [] | |
servers = {} | |
for url in urls: | |
try: | |
request = build_request(url) | |
uh, e = catch_request(request) | |
if e: | |
errors.append('%s' % e) | |
raise SpeedtestCliServerListError | |
serversxml = [] | |
while 1: | |
serversxml.append(uh.read(10240)) | |
if len(serversxml[-1]) == 0: | |
break | |
if int(uh.code) != 200: | |
uh.close() | |
raise SpeedtestCliServerListError | |
uh.close() | |
try: | |
try: | |
root = ET.fromstring(''.encode().join(serversxml)) | |
elements = root.getiterator('server') | |
except AttributeError: # Python3 branch | |
root = DOM.parseString(''.join(serversxml)) | |
elements = root.getElementsByTagName('server') | |
except SyntaxError: | |
raise SpeedtestCliServerListError | |
for server in elements: | |
try: | |
attrib = server.attrib | |
except AttributeError: | |
attrib = dict(list(server.attributes.items())) | |
d = distance([float(client['lat']), | |
float(client['lon'])], | |
[float(attrib.get('lat')), | |
float(attrib.get('lon'))]) | |
attrib['d'] = d | |
if d not in servers: | |
servers[d] = [attrib] | |
else: | |
servers[d].append(attrib) | |
del root | |
del serversxml | |
del elements | |
except SpeedtestCliServerListError: | |
continue | |
# We were able to fetch and parse the list of speedtest.net servers | |
if servers: | |
break | |
if not servers: | |
print_('Failed to retrieve list of speedtest.net servers:\n\n %s' % | |
'\n'.join(errors)) | |
sys.exit(1) | |
closest = [] | |
for d in sorted(servers.keys()): | |
for s in servers[d]: | |
closest.append(s) | |
if len(closest) == 5 and not all: | |
break | |
else: | |
continue | |
break | |
del servers | |
return closest | |
def getBestServer(servers): | |
"""Perform a speedtest.net latency request to determine which | |
speedtest.net server has the lowest latency | |
""" | |
results = {} | |
for server in servers: | |
cum = [] | |
url = '%s/latency.txt' % os.path.dirname(server['url']) | |
urlparts = urlparse(url) | |
for i in range(0, 3): | |
try: | |
if urlparts[0] == 'https': | |
h = HTTPSConnection(urlparts[1]) | |
else: | |
h = HTTPConnection(urlparts[1]) | |
headers = {'User-Agent': user_agent} | |
start = timeit.default_timer() | |
h.request("GET", urlparts[2], headers=headers) | |
r = h.getresponse() | |
total = (timeit.default_timer() - start) | |
except (HTTPError, URLError, socket.error): | |
cum.append(3600) | |
continue | |
text = r.read(9) | |
if int(r.status) == 200 and text == 'test=test'.encode(): | |
cum.append(total) | |
else: | |
cum.append(3600) | |
h.close() | |
avg = round((sum(cum) / 6) * 1000, 3) | |
results[avg] = server | |
fastest = sorted(results.keys())[0] | |
best = results[fastest] | |
best['latency'] = fastest | |
return best | |
def ctrl_c(signum, frame): | |
"""Catch Ctrl-C key sequence and set a shutdown_event for our threaded | |
operations | |
""" | |
global shutdown_event | |
shutdown_event.set() | |
raise SystemExit('\nCancelling...') | |
def version(): | |
"""Print the version""" | |
raise SystemExit(__version__) | |
def speedtest(): | |
"""Run the full speedtest.net test""" | |
global shutdown_event, source, scheme | |
shutdown_event = threading.Event() | |
signal.signal(signal.SIGINT, ctrl_c) | |
description = ( | |
'Command line interface for testing internet bandwidth using ' | |
'speedtest.net.\n' | |
'------------------------------------------------------------' | |
'--------------\n' | |
'https://github.com/sivel/speedtest-cli') | |
parser = ArgParser(description=description) | |
# Give optparse.OptionParser an `add_argument` method for | |
# compatibility with argparse.ArgumentParser | |
try: | |
parser.add_argument = parser.add_option | |
except AttributeError: | |
pass | |
parser.add_argument('--bytes', dest='units', action='store_const', | |
const=('byte', 1), default=('bit', 8), | |
help='Display values in bytes instead of bits. Does ' | |
'not affect the image generated by --share') | |
parser.add_argument('--share', action='store_true', | |
help='Generate and provide a URL to the speedtest.net ' | |
'share results image') | |
parser.add_argument('--simple', action='store_true', | |
help='Suppress verbose output, only show basic ' | |
'information') | |
parser.add_argument('--list', action='store_true', | |
help='Display a list of speedtest.net servers ' | |
'sorted by distance') | |
parser.add_argument('--server', help='Specify a server ID to test against') | |
parser.add_argument('--mini', help='URL of the Speedtest Mini server') | |
parser.add_argument('--source', help='Source IP address to bind to') | |
parser.add_argument('--timeout', default=10, type=int, | |
help='HTTP timeout in seconds. Default 10') | |
parser.add_argument('--secure', action='store_true', | |
help='Use HTTPS instead of HTTP when communicating ' | |
'with speedtest.net operated servers') | |
parser.add_argument('--version', action='store_true', | |
help='Show the version number and exit') | |
options = parser.parse_args() | |
if isinstance(options, tuple): | |
args = options[0] | |
else: | |
args = options | |
del options | |
# Print the version and exit | |
if args.version: | |
version() | |
socket.setdefaulttimeout(args.timeout) | |
# Pre-cache the user agent string | |
build_user_agent() | |
# If specified bind to a specific IP address | |
if args.source: | |
source = args.source | |
socket.socket = bound_socket | |
if args.secure: | |
scheme = 'https' | |
if not args.simple: | |
print_('Retrieving speedtest.net configuration...') | |
try: | |
config = getConfig() | |
except URLError: | |
print_('Cannot retrieve speedtest configuration') | |
sys.exit(1) | |
if not args.simple: | |
print_('Retrieving speedtest.net server list...') | |
if args.list or args.server: | |
servers = closestServers(config['client'], True) | |
if args.list: | |
serverList = [] | |
for server in servers: | |
line = ('%(id)4s) %(sponsor)s (%(name)s, %(country)s) ' | |
'[%(d)0.2f km]' % server) | |
serverList.append(line) | |
print_('\n'.join(serverList).encode('utf-8', 'ignore')) | |
sys.exit(0) | |
else: | |
servers = closestServers(config['client']) | |
if not args.simple: | |
print_('Testing from %(isp)s (%(ip)s)...' % config['client']) | |
if args.server: | |
try: | |
best = getBestServer(filter(lambda x: x['id'] == args.server, | |
servers)) | |
except IndexError: | |
print_('Invalid server ID') | |
sys.exit(1) | |
elif args.mini: | |
name, ext = os.path.splitext(args.mini) | |
if ext: | |
url = os.path.dirname(args.mini) | |
else: | |
url = args.mini | |
urlparts = urlparse(url) | |
try: | |
request = build_request(args.mini) | |
f = urlopen(request) | |
except: | |
print_('Invalid Speedtest Mini URL') | |
sys.exit(1) | |
else: | |
text = f.read() | |
f.close() | |
extension = re.findall('upload_extension: "([^"]+)"', text.decode()) | |
if not extension: | |
for ext in ['php', 'asp', 'aspx', 'jsp']: | |
try: | |
request = build_request('%s/speedtest/upload.%s' % | |
(args.mini, ext)) | |
f = urlopen(request) | |
except: | |
pass | |
else: | |
data = f.read().strip() | |
if (f.code == 200 and | |
len(data.splitlines()) == 1 and | |
re.match('size=[0-9]', data)): | |
extension = [ext] | |
break | |
if not urlparts or not extension: | |
print_('Please provide the full URL of your Speedtest Mini server') | |
sys.exit(1) | |
servers = [{ | |
'sponsor': 'Speedtest Mini', | |
'name': urlparts[1], | |
'd': 0, | |
'url': '%s/speedtest/upload.%s' % (url.rstrip('/'), extension[0]), | |
'latency': 0, | |
'id': 0 | |
}] | |
try: | |
best = getBestServer(servers) | |
except: | |
best = servers[0] | |
else: | |
if not args.simple: | |
print_('Selecting best server based on latency...') | |
best = getBestServer(servers) | |
if not args.simple: | |
print_(('Hosted by %(sponsor)s (%(name)s) [%(d)0.2f km]: ' | |
'%(latency)s ms' % best).encode('utf-8', 'ignore')) | |
else: | |
print_('Ping: %(latency)s ms' % best) | |
sizes = [350, 500, 750, 1000, 1500, 2000, 2500, 3000, 3500, 4000] | |
urls = [] | |
for size in sizes: | |
for i in range(0, 4): | |
urls.append('%s/random%sx%s.jpg' % | |
(os.path.dirname(best['url']), size, size)) | |
if not args.simple: | |
print_('Testing download speed', end='') | |
dlspeed = downloadSpeed(urls, args.simple) | |
if not args.simple: | |
print_() | |
print_('Download: %0.2f M%s/s' % | |
((dlspeed / 1000 / 1000) * args.units[1], args.units[0])) | |
sizesizes = [int(.25 * 1000 * 1000), int(.5 * 1000 * 1000)] | |
sizes = [] | |
for size in sizesizes: | |
for i in range(0, 25): | |
sizes.append(size) | |
if not args.simple: | |
print_('Testing upload speed', end='') | |
ulspeed = uploadSpeed(best['url'], sizes, args.simple) | |
if not args.simple: | |
print_() | |
print_('Upload: %0.2f M%s/s' % | |
((ulspeed / 1000 / 1000) * args.units[1], args.units[0])) | |
if args.share and args.mini: | |
print_('Cannot generate a speedtest.net share results image while ' | |
'testing against a Speedtest Mini server') | |
elif args.share: | |
dlspeedk = int(round((dlspeed / 1000) * 8, 0)) | |
ping = int(round(best['latency'], 0)) | |
ulspeedk = int(round((ulspeed / 1000) * 8, 0)) | |
# Build the request to send results back to speedtest.net | |
# We use a list instead of a dict because the API expects parameters | |
# in a certain order | |
apiData = [ | |
'download=%s' % dlspeedk, | |
'ping=%s' % ping, | |
'upload=%s' % ulspeedk, | |
'promo=', | |
'startmode=%s' % 'pingselect', | |
'recommendedserverid=%s' % best['id'], | |
'accuracy=%s' % 1, | |
'serverid=%s' % best['id'], | |
'hash=%s' % md5(('%s-%s-%s-%s' % | |
(ping, ulspeedk, dlspeedk, '297aae72')) | |
.encode()).hexdigest()] | |
headers = {'Referer': 'http://c.speedtest.net/flash/speedtest.swf'} | |
request = build_request('://www.speedtest.net/api/api.php', | |
data='&'.join(apiData).encode(), | |
headers=headers) | |
f, e = catch_request(request) | |
if e: | |
print_('Could not submit results to speedtest.net: %s' % e) | |
sys.exit(1) | |
response = f.read() | |
code = f.code | |
f.close() | |
if int(code) != 200: | |
print_('Could not submit results to speedtest.net') | |
sys.exit(1) | |
qsargs = parse_qs(response.decode()) | |
resultid = qsargs.get('resultid') | |
if not resultid or len(resultid) != 1: | |
print_('Could not submit results to speedtest.net') | |
sys.exit(1) | |
print_('Share results: %s://www.speedtest.net/result/%s.png' % | |
(scheme, resultid[0])) | |
def main(): | |
try: | |
speedtest() | |
except KeyboardInterrupt: | |
print_('\nCancelling...') | |
if __name__ == '__main__': | |
main() | |
# vim:ts=4:sw=4:expandtab |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment