Skip to content

Instantly share code, notes, and snippets.

@phwelo
Created August 16, 2019 13:36
Show Gist options
  • Save phwelo/bed59e03bb6cd91d6c8612f92cc337bb to your computer and use it in GitHub Desktop.
Save phwelo/bed59e03bb6cd91d6c8612f92cc337bb to your computer and use it in GitHub Desktop.
Speedtest fork with args pulled out and halo used for progress indication
#!/usr/bin/env python
import os
import re
import csv
import sys
import math
import errno
import signal
import socket
import timeit
import datetime
import platform
import threading
import xml.parsers.expat
import json
import xml.etree.cElementTree
from halo import Halo
import gzip
import xml.etree.cElementTree as ET
from urllib.request import (urlopen, Request, HTTPError, URLError,
AbstractHTTPHandler, ProxyHandler,
HTTPDefaultErrorHandler, HTTPRedirectHandler,
HTTPErrorProcessor, OpenerDirector)
from http.client import HTTPConnection, BadStatusLine, HTTPSConnection
from queue import Queue
from urllib.parse import (urlparse, parse_qs)
from hashlib import md5
from argparse import ArgumentParser as ArgParser
from argparse import SUPPRESS as ARG_SUPPRESS
from io import StringIO, BytesIO
import builtins
from io import TextIOWrapper, FileIO
import ssl
# Some global variables we use
DEBUG = False
_GLOBAL_DEFAULT_TIMEOUT = object()
PARSER_TYPE_INT = int
PARSER_TYPE_STR = str
PARSER_TYPE_FLOAT = float
GZIP_BASE = gzip.GzipFile
__version__ = '2.1.1'
class FakeShutdownEvent(object):
@staticmethod
def isSet():
"Dummy method to always return false"""
return False
HTTP_ERRORS = ((HTTPError, URLError, socket.error, ssl.SSLError, BadStatusLine) + tuple())
class SpeedtestException(Exception):
"""Base exception for this module"""
class SpeedtestCLIError(SpeedtestException):
"""Generic exception for raising errors during CLI operation"""
class SpeedtestHTTPError(SpeedtestException):
"""Base HTTP exception for this module"""
class SpeedtestConfigError(SpeedtestException):
"""Configuration XML is invalid"""
class SpeedtestServersError(SpeedtestException):
"""Servers XML is invalid"""
class ConfigRetrievalError(SpeedtestHTTPError):
"""Could not retrieve config.php"""
class ServersRetrievalError(SpeedtestHTTPError):
"""Could not retrieve speedtest-servers.php"""
class InvalidServerIDType(SpeedtestException):
"""Server ID used for filtering was not an integer"""
class NoMatchedServers(SpeedtestException):
"""No servers matched when filtering"""
class SpeedtestMiniConnectFailure(SpeedtestException):
"""Could not connect to the provided speedtest mini server"""
class InvalidSpeedtestMiniServer(SpeedtestException):
"""Server provided as a speedtest mini server does not actually appear"""
class SpeedtestUploadTimeout(SpeedtestException):
"""testlength configuration reached during upload """
class SpeedtestBestServerFailure(SpeedtestException):
"""Unable to determine best server"""
class SpeedtestMissingBestServer(SpeedtestException):
"""get_best_server not called or not able to determine best server"""
def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT,source_address=None):
host, port = address
err = None
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = None
sock = socket.socket(af, socktype, proto)
if timeout is not _GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(float(timeout))
if source_address:
sock.bind(source_address)
sock.connect(sa)
return sock
class SpeedtestHTTPConnection(HTTPConnection):
def __init__(self, *args, **kwargs):
source_address = kwargs.pop('source_address', None)
timeout = kwargs.pop('timeout', 10)
HTTPConnection.__init__(self, *args, **kwargs)
self.source_address = source_address
self.timeout = timeout
def connect(self):
self.sock = create_connection(
(self.host, self.port),
self.timeout,
self.source_address
)
if HTTPSConnection:
class SpeedtestHTTPSConnection(HTTPSConnection, SpeedtestHTTPConnection):
def __init__(self, *args, **kwargs):
source_address = kwargs.pop('source_address', None)
timeout = kwargs.pop('timeout', 10)
HTTPSConnection.__init__(self, *args, **kwargs)
self.timeout = timeout
self.source_address = source_address
def connect(self):
SpeedtestHTTPConnection.connect(self)
if ssl:
try:
kwargs = {}
if hasattr(ssl, 'SSLContext'):
kwargs['server_hostname'] = self.host
self.sock = self._context.wrap_socket(self.sock, **kwargs)
except AttributeError:
self.sock = ssl.wrap_socket(self.sock)
try:
self.sock.server_hostname = self.host
except AttributeError:
pass
else:
raise SpeedtestException(
'This version of Python does not support HTTPS/SSL '
'functionality'
)
def _build_connection(connection, source_address, timeout, context=None):
def inner(host, **kwargs):
kwargs.update({
'source_address': source_address,
'timeout': timeout
})
if context:
kwargs['context'] = context
return connection(host, **kwargs)
return inner
class SpeedtestHTTPHandler(AbstractHTTPHandler):
"""Custom ``HTTPHandler`` that can build a ``HTTPConnection`` with the
args we need for ``source_address`` and ``timeout``
"""
def __init__(self, debuglevel=0, source_address=None, timeout=10):
AbstractHTTPHandler.__init__(self, debuglevel)
self.source_address = source_address
self.timeout = timeout
def http_open(self, req):
return self.do_open(
_build_connection(
SpeedtestHTTPConnection,
self.source_address,
self.timeout
),
req
)
http_request = AbstractHTTPHandler.do_request_
class SpeedtestHTTPSHandler(AbstractHTTPHandler):
def __init__(self, debuglevel=0, context=None, source_address=None, timeout=10):
AbstractHTTPHandler.__init__(self, debuglevel)
self._context = context
self.source_address = source_address
self.timeout = timeout
def https_open(self, req):
return self.do_open(
_build_connection(
SpeedtestHTTPSConnection,
self.source_address,
self.timeout,
context=self._context,
),
req
)
https_request = AbstractHTTPHandler.do_request_
def build_opener(source_address=None, timeout=10):
if source_address:
source_address_tuple = (source_address, 0)
else:
source_address_tuple = None
handlers = [
ProxyHandler(),
SpeedtestHTTPHandler(source_address=source_address_tuple,timeout=timeout),
SpeedtestHTTPSHandler(source_address=source_address_tuple,timeout=timeout),
HTTPDefaultErrorHandler(),
HTTPRedirectHandler(),
HTTPErrorProcessor()
]
opener = OpenerDirector()
opener.addheaders = [('User-agent', build_user_agent())]
for handler in handlers:
opener.add_handler(handler)
return opener
class GzipDecodedResponse(GZIP_BASE):
def __init__(self, response):
IO = BytesIO or StringIO
self.io = IO()
while 1:
chunk = response.read(1024)
if len(chunk) == 0:
break
self.io.write(chunk)
self.io.seek(0)
gzip.GzipFile.__init__(self, mode='rb', fileobj=self.io)
def close(self):
try:
gzip.GzipFile.close(self)
finally:
self.io.close()
def get_exception():
return sys.exc_info()[1]
def distance(origin, destination):
"""Determine distance between 2 sets of [lat,lon] in km"""
lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # km
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat / 2) * math.sin(dlat / 2) +
math.cos(math.radians(lat1)) *
math.cos(math.radians(lat2)) * math.sin(dlon / 2) *
math.sin(dlon / 2))
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
d = radius * c
return d
def build_user_agent():
"""Build a Mozilla/5.0 compatible User-Agent string"""
ua_tuple = (
'Mozilla/5.0',
'(%s; U; %s; en-us)' % (platform.platform(),
platform.architecture()[0]),
'Python/%s' % platform.python_version(),
'(KHTML, like Gecko)',
'speedtest-cli/%s' % __version__
)
user_agent = ' '.join(ua_tuple)
return user_agent
def build_request(url, data=None, headers=None, bump='0', secure=False):
if not headers:
headers = {}
if url[0] == ':':
scheme = ('http', 'https')[bool(secure)]
schemed_url = '%s%s' % (scheme, url)
else:
schemed_url = url
if '?' in url:
delim = '&'
else:
delim = '?'
final_url = '%s%sx=%s.%s' % (schemed_url, delim, int(timeit.time.time() * 1000), bump)
headers.update({'Cache-Control': 'no-cache',})
return Request(final_url, data=data, headers=headers)
def catch_request(request, opener=None):
_open = opener.open
try:
uh = _open(request)
return uh, False
except HTTP_ERRORS:
e = get_exception()
return None, e
def get_response_stream(response):
try:
getheader = response.headers.getheader
except AttributeError:
getheader = response.getheader
if getheader('content-encoding') == 'gzip':
return GzipDecodedResponse(response)
return response
def get_attributes_by_tag_name(dom, tag_name):
elem = dom.getElementsByTagName(tag_name)[0]
return dict(list(elem.attributes.items()))
class HTTPDownloader(threading.Thread):
"""Thread class for retrieving a URL"""
def __init__(self, i, request, start, timeout, opener=None,
shutdown_event=None):
threading.Thread.__init__(self)
self.request = request
self.result = [0]
self.starttime = start
self.timeout = timeout
self.i = i
if opener:
self._opener = opener.open
else:
self._opener = urlopen
if shutdown_event:
self._shutdown_event = shutdown_event
else:
self._shutdown_event = FakeShutdownEvent()
def run(self):
try:
if (timeit.default_timer() - self.starttime) <= self.timeout:
f = self._opener(self.request)
while (not self._shutdown_event.isSet() and
(timeit.default_timer() - self.starttime) <=
self.timeout):
self.result.append(len(f.read(10240)))
if self.result[-1] == 0:
break
f.close()
except IOError:
pass
class HTTPUploaderData(object):
"""File like object to improve cutting off the upload once the timeout
has been reached
"""
def __init__(self, length, start, timeout, shutdown_event=None):
self.length = length
self.start = start
self.timeout = timeout
if shutdown_event:
self._shutdown_event = shutdown_event
else:
self._shutdown_event = FakeShutdownEvent()
self._data = None
self.total = [0]
def pre_allocate(self):
chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
multiplier = int(round(int(self.length) / 36.0))
IO = BytesIO or StringIO
try:
self._data = IO(
('content1=%s' %
(chars * multiplier)[0:int(self.length) - 9]
).encode()
)
except MemoryError:
raise SpeedtestCLIError(
'Insufficient memory to pre-allocate upload data. Please '
'use --no-pre-allocate'
)
@property
def data(self):
if not self._data:
self.pre_allocate()
return self._data
def read(self, n=10240):
if ((timeit.default_timer() - self.start) <= self.timeout and
not self._shutdown_event.isSet()):
chunk = self.data.read(n)
self.total.append(len(chunk))
return chunk
else:
raise SpeedtestUploadTimeout()
def __len__(self):
return self.length
class HTTPUploader(threading.Thread):
"""Thread class for putting a URL"""
def __init__(self, i, request, start, size, timeout, opener=None,
shutdown_event=None):
threading.Thread.__init__(self)
self.request = request
self.request.data.start = self.starttime = start
self.size = size
self.result = None
self.timeout = timeout
self.i = i
self._opener = opener.open
if shutdown_event:
self._shutdown_event = shutdown_event
else:
self._shutdown_event = FakeShutdownEvent()
def run(self):
request = self.request
try:
if ((timeit.default_timer() - self.starttime) <= self.timeout and
not self._shutdown_event.isSet()):
try:
f = self._opener(request)
except TypeError:
# PY24 expects a string or buffer
# This also causes issues with Ctrl-C, but we will concede
# for the moment that Ctrl-C on PY24 isn't immediate
request = build_request(self.request.get_full_url(),
data=request.data.read(self.size))
f = self._opener(request)
f.read(11)
f.close()
self.result = sum(self.request.data.total)
else:
self.result = 0
except (IOError, SpeedtestUploadTimeout):
self.result = sum(self.request.data.total)
class SpeedtestResults(object):
"""Class for holding the results of a speedtest, including:
Download speed
Upload speed
Ping/Latency to test server
Data about server that the test was run against
Additionally this class can return a result data as a dictionary or CSV,
as well as submit a POST of the result data to the speedtest.net API
to get a share results image link.
"""
def __init__(self, download=0, upload=0, ping=0, server=None, client=None,
opener=None, secure=False):
self.download = download
self.upload = upload
self.ping = ping
if server is None:
self.server = {}
else:
self.server = server
self.client = client or {}
self._share = None
self.timestamp = '%sZ' % datetime.datetime.utcnow().isoformat()
self.bytes_received = 0
self.bytes_sent = 0
if opener:
self._opener = opener
else:
self._opener = build_opener()
self._secure = secure
def __repr__(self):
return repr(self.dict())
def dict(self):
"""Return dictionary of result data"""
return {
'download': self.download,
'upload': self.upload,
'ping': self.ping,
'server': self.server,
'timestamp': self.timestamp,
'bytes_sent': self.bytes_sent,
'bytes_received': self.bytes_received,
'client': self.client,
}
class Speedtest(object):
"""Class for performing standard speedtest.net testing operations"""
def __init__(self, config=None, source_address=None, timeout=10,
secure=False, shutdown_event=None):
self.config = {}
self._source_address = source_address
self._timeout = timeout
self._opener = build_opener(source_address, timeout)
self._secure = secure
if shutdown_event:
self._shutdown_event = shutdown_event
else:
self._shutdown_event = FakeShutdownEvent()
self.get_config()
if config is not None:
self.config.update(config)
self.servers = {}
self.closest = []
self._best = {}
self.results = SpeedtestResults(
client=self.config['client'],
opener=self._opener,
secure=secure,
)
@property
def best(self):
if not self._best:
self.get_best_server()
return self._best
def get_config(self):
"""Download the speedtest.net configuration and return only the data
we are interested in
"""
headers = {}
if gzip:
headers['Accept-Encoding'] = 'gzip'
request = build_request('://www.speedtest.net/speedtest-config.php',
headers=headers, secure=self._secure)
uh, e = catch_request(request, opener=self._opener)
if e:
raise ConfigRetrievalError(e)
configxml_list = []
stream = get_response_stream(uh)
while 1:
try:
configxml_list.append(stream.read(1024))
except (OSError, EOFError):
raise ConfigRetrievalError(get_exception())
if len(configxml_list[-1]) == 0:
break
stream.close()
uh.close()
if int(uh.code) != 200:
return None
configxml = ''.encode().join(configxml_list)
try:
try:
root = ET.fromstring(configxml)
except ET.ParseError:
e = get_exception()
raise SpeedtestConfigError(
'Malformed speedtest.net configuration: %s' % e
)
server_config = root.find('server-config').attrib
download = root.find('download').attrib
upload = root.find('upload').attrib
# times = root.find('times').attrib
client = root.find('client').attrib
except AttributeError:
try:
root = DOM.parseString(configxml)
except ExpatError:
e = get_exception()
raise SpeedtestConfigError(
'Malformed speedtest.net configuration: %s' % e
)
server_config = get_attributes_by_tag_name(root, 'server-config')
download = get_attributes_by_tag_name(root, 'download')
upload = get_attributes_by_tag_name(root, 'upload')
# times = get_attributes_by_tag_name(root, 'times')
client = get_attributes_by_tag_name(root, 'client')
ignore_servers = list(
map(int, server_config['ignoreids'].split(','))
)
ratio = int(upload['ratio'])
upload_max = int(upload['maxchunkcount'])
up_sizes = [32768, 65536, 131072, 262144, 524288, 1048576, 7340032]
sizes = {
'upload': up_sizes[ratio - 1:],
'download': [350, 500, 750, 1000, 1500, 2000, 2500,
3000, 3500, 4000]
}
size_count = len(sizes['upload'])
upload_count = int(math.ceil(upload_max / size_count))
counts = {
'upload': upload_count,
'download': int(download['threadsperurl'])
}
threads = {
'upload': int(upload['threads']),
'download': int(server_config['threadcount']) * 2
}
length = {
'upload': int(upload['testlength']),
'download': int(download['testlength'])
}
self.config.update({
'client': client,
'ignore_servers': ignore_servers,
'sizes': sizes,
'counts': counts,
'threads': threads,
'length': length,
'upload_max': upload_count * size_count
})
try:
self.lat_lon = (float(client['lat']), float(client['lon']))
except ValueError:
raise SpeedtestConfigError(
'Unknown location: lat=%r lon=%r' %
(client.get('lat'), client.get('lon'))
)
return self.config
def get_servers(self, servers=None, exclude=None):
"""Retrieve a the list of speedtest.net servers, optionally filtered
to servers matching those specified in the ``servers`` argument
"""
if servers is None:
servers = []
if exclude is None:
exclude = []
self.servers.clear()
for server_list in (servers, exclude):
for i, s in enumerate(server_list):
try:
server_list[i] = int(s)
except ValueError:
raise InvalidServerIDType(
'%s is an invalid server type, must be int' % s
)
urls = [
'://www.speedtest.net/speedtest-servers-static.php',
'http://c.speedtest.net/speedtest-servers-static.php',
'://www.speedtest.net/speedtest-servers.php',
'http://c.speedtest.net/speedtest-servers.php',
]
headers = {}
if gzip:
headers['Accept-Encoding'] = 'gzip'
errors = []
for url in urls:
try:
request = build_request(
'%s?threads=%s' % (url,
self.config['threads']['download']),
headers=headers,
secure=self._secure
)
uh, e = catch_request(request, opener=self._opener)
if e:
errors.append('%s' % e)
raise ServersRetrievalError()
stream = get_response_stream(uh)
serversxml_list = []
while 1:
try:
serversxml_list.append(stream.read(1024))
except (OSError, EOFError):
raise ServersRetrievalError(get_exception())
if len(serversxml_list[-1]) == 0:
break
stream.close()
uh.close()
if int(uh.code) != 200:
raise ServersRetrievalError()
serversxml = ''.encode().join(serversxml_list)
try:
try:
try:
root = ET.fromstring(serversxml)
except ET.ParseError:
e = get_exception()
raise SpeedtestServersError(
'Malformed speedtest.net server list: %s' % e
)
elements = root.getiterator('server')
except AttributeError:
try:
root = DOM.parseString(serversxml)
except ExpatError:
e = get_exception()
raise SpeedtestServersError(
'Malformed speedtest.net server list: %s' % e
)
elements = root.getElementsByTagName('server')
except (SyntaxError, xml.parsers.expat.ExpatError):
raise ServersRetrievalError()
for server in elements:
try:
attrib = server.attrib
except AttributeError:
attrib = dict(list(server.attributes.items()))
if servers and int(attrib.get('id')) not in servers:
continue
if (int(attrib.get('id')) in self.config['ignore_servers']
or int(attrib.get('id')) in exclude):
continue
try:
d = distance(self.lat_lon,
(float(attrib.get('lat')),
float(attrib.get('lon'))))
except Exception:
continue
attrib['d'] = d
try:
self.servers[d].append(attrib)
except KeyError:
self.servers[d] = [attrib]
break
except ServersRetrievalError:
continue
if (servers or exclude) and not self.servers:
raise NoMatchedServers()
return self.servers
def get_closest_servers(self, limit=5):
"""Limit servers to the closest speedtest.net servers based on
geographic distance
"""
if not self.servers:
self.get_servers()
for d in sorted(self.servers.keys()):
for s in self.servers[d]:
self.closest.append(s)
if len(self.closest) == limit:
break
else:
continue
break
return self.closest
def get_best_server(self, servers=None):
"""Perform a speedtest.net "ping" to determine which speedtest.net
server has the lowest latency
"""
if not servers:
if not self.closest:
servers = self.get_closest_servers()
servers = self.closest
if self._source_address:
source_address_tuple = (self._source_address, 0)
else:
source_address_tuple = None
user_agent = build_user_agent()
results = {}
for server in servers:
cum = []
url = os.path.dirname(server['url'])
stamp = int(timeit.time.time() * 1000)
latency_url = '%s/latency.txt?x=%s' % (url, stamp)
for i in range(0, 3):
this_latency_url = '%s.%s' % (latency_url, i)
urlparts = urlparse(latency_url)
try:
if urlparts[0] == 'https':
h = SpeedtestHTTPSConnection(
urlparts[1],
source_address=source_address_tuple
)
else:
h = SpeedtestHTTPConnection(
urlparts[1],
source_address=source_address_tuple
)
headers = {'User-Agent': user_agent}
path = '%s?%s' % (urlparts[2], urlparts[4])
start = timeit.default_timer()
h.request("GET", path, headers=headers)
r = h.getresponse()
total = (timeit.default_timer() - start)
except HTTP_ERRORS:
e = get_exception()
cum.append(3600)
continue
text = r.read(9)
if int(r.status) == 200 and text == 'test=test'.encode():
cum.append(total)
else:
cum.append(3600)
h.close()
avg = round((sum(cum) / 6) * 1000.0, 3)
results[avg] = server
try:
fastest = sorted(results.keys())[0]
except IndexError:
raise SpeedtestBestServerFailure('Unable to connect to servers to '
'test latency.')
best = results[fastest]
best['latency'] = fastest
self.results.ping = fastest
self.results.server = best
self._best.update(best)
return best
def download(self, threads=None):
"""Test download speed against speedtest.net
A ``threads`` value of ``None`` will fall back to those dictated
by the speedtest.net configuration
"""
urls = []
for size in self.config['sizes']['download']:
for _ in range(0, self.config['counts']['download']):
urls.append('%s/random%sx%s.jpg' %
(os.path.dirname(self.best['url']), size, size))
request_count = len(urls)
requests = []
for i, url in enumerate(urls):
requests.append(
build_request(url, bump=i, secure=self._secure)
)
def producer(q, requests, request_count):
for i, request in enumerate(requests):
thread = HTTPDownloader(
i,
request,
start,
self.config['length']['download'],
opener=self._opener,
shutdown_event=self._shutdown_event
)
thread.start()
q.put(thread, True)
finished = []
def consumer(q, request_count):
while len(finished) < request_count:
thread = q.get(True)
while thread.isAlive():
thread.join(timeout=0.1)
finished.append(sum(thread.result))
q = Queue(threads or self.config['threads']['download'])
prod_thread = threading.Thread(target=producer,
args=(q, requests, request_count))
cons_thread = threading.Thread(target=consumer,
args=(q, request_count))
start = timeit.default_timer()
prod_thread.start()
cons_thread.start()
while prod_thread.isAlive():
prod_thread.join(timeout=0.1)
while cons_thread.isAlive():
cons_thread.join(timeout=0.1)
stop = timeit.default_timer()
self.results.bytes_received = sum(finished)
self.results.download = (
(self.results.bytes_received / (stop - start)) * 8.0
)
if self.results.download > 100000:
self.config['threads']['upload'] = 8
return self.results.download
def upload(self, pre_allocate=True, threads=None):
"""Test upload speed against speedtest.net
A ``threads`` value of ``None`` will fall back to those dictated
by the speedtest.net configuration
"""
sizes = []
for size in self.config['sizes']['upload']:
for _ in range(0, self.config['counts']['upload']):
sizes.append(size)
# request_count = len(sizes)
request_count = self.config['upload_max']
requests = []
for i, size in enumerate(sizes):
# We set ``0`` for ``start`` and handle setting the actual
# ``start`` in ``HTTPUploader`` to get better measurements
data = HTTPUploaderData(
size,
0,
self.config['length']['upload'],
shutdown_event=self._shutdown_event
)
if pre_allocate:
data.pre_allocate()
headers = {'Content-length': size}
requests.append(
(
build_request(self.best['url'], data, secure=self._secure,
headers=headers),
size
)
)
def producer(q, requests, request_count):
for i, request in enumerate(requests[:request_count]):
thread = HTTPUploader(
i,
request[0],
start,
request[1],
self.config['length']['upload'],
opener=self._opener,
shutdown_event=self._shutdown_event
)
thread.start()
q.put(thread, True)
finished = []
def consumer(q, request_count):
while len(finished) < request_count:
thread = q.get(True)
while thread.isAlive():
thread.join(timeout=0.1)
finished.append(thread.result)
q = Queue(threads or self.config['threads']['upload'])
prod_thread = threading.Thread(target=producer, args=(q, requests, request_count))
cons_thread = threading.Thread(target=consumer, args=(q, request_count))
start = timeit.default_timer()
prod_thread.start()
cons_thread.start()
while prod_thread.isAlive():
prod_thread.join(timeout=0.1)
while cons_thread.isAlive():
cons_thread.join(timeout=0.1)
stop = timeit.default_timer()
self.results.bytes_sent = sum(finished)
self.results.upload = (
(self.results.bytes_sent / (stop - start)) * 8.0
)
return self.results.upload
def ctrl_c(shutdown_event):
"""Catch Ctrl-C key sequence and set a SHUTDOWN_EVENT for our threaded
operations
"""
def inner(signum, frame):
shutdown_event.set()
sys.exit(0)
return inner
def parse_args():
description = ('Straight Forward Speed Test')
parser = ArgParser(description=description)
# Give optparse.OptionParser an `add_argument` method for
# compatibility with argparse.ArgumentParser
try:
parser.add_argument = parser.add_option
except AttributeError:
pass
parser.add_argument('--bytes', dest='units', action='store_const',
const=('byte', 8), default=('bit', 1),
help='Display values in bytes instead of bits. Does '
'not affect the image generated by --share, nor '
'output from --json or --csv')
parser.add_argument('--simple', action='store_true', default=False,
help='Suppress verbose output, only show basic '
'information')
parser.add_argument('--csv', action='store_true', default=False,
help='Suppress verbose output, only show basic '
'information in CSV format. Speeds listed in '
'bit/s and not affected by --bytes')
parser.add_argument('--csv-delimiter', default=',', type=PARSER_TYPE_STR,
help='Single character delimiter to use in CSV '
'output. Default ","')
parser.add_argument('--csv-header', action='store_true', default=False,
help='Print CSV headers')
parser.add_argument('--json', action='store_true', default=False,
help='Suppress verbose output, only show basic '
'information in JSON format. Speeds listed in '
'bit/s and not affected by --bytes')
parser.add_argument('--list', action='store_true',
help='Display a list of speedtest.net servers '
'sorted by distance')
parser.add_argument('--server', type=PARSER_TYPE_INT, action='append',
help='Specify a server ID to test against. Can be '
'supplied multiple times')
parser.add_argument('--exclude', type=PARSER_TYPE_INT, action='append',
help='Exclude a server from selection. Can be '
'supplied multiple times')
parser.add_argument('--mini', help='URL of the Speedtest Mini server')
parser.add_argument('--source', help='Source IP address to bind to')
parser.add_argument('--timeout', default=10, type=PARSER_TYPE_FLOAT,
help='HTTP timeout in seconds. Default 10')
parser.add_argument('--secure', action='store_true',
help='Use HTTPS instead of HTTP when communicating '
'with speedtest.net operated servers')
parser.add_argument('--no-pre-allocate', dest='pre_allocate',
action='store_const', default=True, const=False,
help='Do not pre allocate upload data. Pre allocation '
'is enabled by default to improve upload '
'performance. To support systems with '
'insufficient memory, use this option to avoid a '
'MemoryError')
parser.add_argument('--debug', action='store_true',
help=ARG_SUPPRESS, default=ARG_SUPPRESS)
options = parser.parse_args()
if isinstance(options, tuple):
args = options[0]
else:
args = options
return args
def validate_optional_args(args):
"""Check if an argument was provided that depends on a module that may
not be part of the Python standard library.
If such an argument is supplied, and the module does not exist, exit
with an error stating which module is missing.
"""
optional_args = {
'json': ('json/simplejson python module', json),
'secure': ('SSL support', HTTPSConnection),
}
for arg, info in optional_args.items():
if getattr(args, arg, False) and info[1] is None:
raise SystemExit('%s is not installed. --%s is '
'unavailable' % (info[0], arg))
def shell():
"""Run the full speedtest.net test"""
global DEBUG
shutdown_event = threading.Event()
signal.signal(signal.SIGINT, ctrl_c(shutdown_event))
args = parse_args()
validate_optional_args(args)
debug = False
quiet = False
machine_format = False
frames = [
"▐gs ▌",
"▐ gs ▌",
"▐ gs ▌",
"▐ gs ▌",
"▐ gs ▌",
"▐ gs ▌",
"▐ gs▌",
"▐ g▌",
"▐s ▌"
]
customgs = {
'interval': 100,
'frames': frames
}
spinner = Halo(text='Getting config from speedtest.net', animation='marquee', spinner = customgs)
spinner.start()
try:
speedtest = Speedtest(
source_address=args.source,
timeout=args.timeout,
secure=args.secure
)
except (ConfigRetrievalError,) + HTTP_ERRORS:
raise SpeedtestCLIError(get_exception())
if args.list:
try:
speedtest.get_servers()
except (ServersRetrievalError,) + HTTP_ERRORS:
raise SpeedtestCLIError(get_exception())
for _, servers in sorted(speedtest.servers.items()):
for server in servers:
line = ('%(id)5s) %(sponsor)s (%(name)s, %(country)s) '
'[%(d)0.2f km]' % server)
sys.exit(0)
spinner.succeed('Got Configuration')
spinner = Halo(text='Testing Download', animation='marquee', spinner = customgs)
spinner.start()
if not args.mini:
try:
speedtest.get_servers(servers=args.server, exclude=args.exclude)
except NoMatchedServers:
raise SpeedtestCLIError(
'No matched servers: %s' %
', '.join('%s' % s for s in args.server)
)
except (ServersRetrievalError,) + HTTP_ERRORS:
raise SpeedtestCLIError(get_exception())
except InvalidServerIDType:
raise SpeedtestCLIError(
'%s is an invalid server type, must '
'be an int' % ', '.join('%s' % s for s in args.server)
)
speedtest.get_best_server()
# elif args.mini:
# speedtest.get_best_server(speedtest.set_mini_server(args.mini))
results = speedtest.results
speedtest.download(
threads=(None, 1)[False]
)
download = (results.download / 1000.0 / 1000.0) / args.units[1]
spinner.succeed('Download: %0.2f M%s/s' % (download, args.units[0]))
spinner = Halo(text='Testing Upload', animation='marquee', spinner = customgs)
spinner.start()
speedtest.upload(
pre_allocate=args.pre_allocate,
threads=(None, 1)[False]
)
upload = (results.upload / 1000.0 / 1000.0) / args.units[1]
spinner.succeed('Upload: %0.2f M%s/s' % (upload, args.units[0]))
def main():
try:
shell()
except KeyboardInterrupt:
print('Cancelling...')
except (SpeedtestException, SystemExit):
e = get_exception()
# Ignore a successful exit, or argparse exit
if getattr(e, 'code', 1) not in (0, 2):
msg = '%s' % e
if not msg:
msg = '%r' % e
raise SystemExit('ERROR: %s' % msg)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment