Created
May 21, 2021 15:01
-
-
Save Terrance/ce0fdfcb72892441df1abd5140fd56d0 to your computer and use it in GitHub Desktop.
Script to produce tables of data from a BT Home Hub 3 router.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from base64 import b64encode | |
from collections import namedtuple | |
from datetime import timedelta | |
from hashlib import md5 | |
import re | |
from bs4 import BeautifulSoup | |
from requests import Session | |
RE_PARAM = r'"((?:[^"\\]|\\.)*)"' | |
def _re_js_object(name, count): | |
return r"new {}\({}\)".format(name, ",".join((RE_PARAM for _ in range(count)))) | |
def _bool(val): | |
return val == "1" | |
def _seconds(val): | |
return timedelta(seconds=int(val)) | |
def _decode(val): | |
return val.encode().decode("unicode_escape") | |
def _encrypt(val): | |
return b64encode(md5(b64encode(val.encode("utf-8"))).hexdigest().encode("utf-8")) | |
def _transform(funcs, args): | |
return (fn(arg) if fn else arg for fn, arg in zip(funcs, args)) | |
def _find(cls, obj, transform, text): | |
regex = _re_js_object(obj, len(transform)) | |
return [cls(*_transform(transform, data)) for data in re.findall(regex, text)] | |
Device = namedtuple("Device", "domain addr source mac name iface active speed dhcp") | |
App = namedtuple("App", "domain name") | |
Rule = namedtuple("Rule", "domain proto intport extportstart extportend") | |
Forward = namedtuple("Forward", "domain name mac addr device") | |
Wireless = namedtuple("Wireless", "domain enabled mac ssid standard channel autochan maxrate coex beacon " | |
"wepkeyindex wepenclevel basicencmodes basicauthmode wpaencmodes " | |
"wpaauthmode ieeeencmodes ieeeauthmode mixedencmodes mixedauthmode") | |
Connection = namedtuple("Connection", "domain status conntype addr dns remoteaddr uptime username password " | |
"type datasent datarecv datatx datarx") | |
Entry = namedtuple("Entry", "time content") | |
Version = namedtuple("Version", "domain model product serial hwver swver firstuse") | |
DeviceInfo = namedtuple("DeviceInfo", "devices apps rules forwards") | |
def find_devices(text): | |
return _find(Device, "stDeviceInfo", (None, None, None, None, _decode, None, _bool, int, bool), text) | |
def find_apps(text): | |
return _find(App, "stApplications", (None, _decode), text) | |
def find_rules(text): | |
return _find(Rule, "stAllPortRules", (None, None, int, int, int), text) | |
def find_forwards(text): | |
return _find(Forward, "stAppRuleSet", (None, _decode, None, None, _decode), text) | |
def find_wireless(text): | |
return _find(Wireless, "stWlan", (None, _bool, None, _decode, None, int, _bool, int, int, None, | |
None, None, None, None, None, None, None, None, None, None), text) | |
def find_connections(text): | |
return _find(Connection, "stWanPPP", (None, None, None, None, None, None, _seconds, _decode, _decode, | |
None, int, int, int, int), text) | |
def find_version(text): | |
return _find(Version, "stDeviceInfo", (None, _decode, _decode, None, None, None, None), text) | |
class HH3: | |
DEFAULT_HOST = "http://192.168.1.254" | |
def __init__(self, pwd, host=None): | |
self._pwd = pwd | |
self._host = host or self.DEFAULT_HOST | |
self._sess = Session() | |
def _get_token(self): | |
resp = self._sess.post("{}/html/common/GetToken.cgi".format(self._host)) | |
return resp.text[-8:-2] | |
def login(self): | |
self._sess.cookies.clear() | |
self._sess.cookies["Language"] = "en" # Needed to get a cookie response | |
resp = self._sess.post("{}/index/login.cgi?ActionID=96".format(self._host), | |
data={"Username": "admin", | |
"Password": _encrypt(self._pwd), | |
"token": self._get_token()}) | |
if "SessionID_R3" not in self._sess.cookies: | |
raise RuntimeError("Login failed") | |
@property | |
def network(self): | |
resp = self._sess.get("{}/html/settings/a_devices_1.html".format(self._host)) | |
return DeviceInfo(find_devices(resp.text), find_apps(resp.text), | |
find_rules(resp.text), find_forwards(resp.text)) | |
@property | |
def wireless(self): | |
resp = self._sess.get("{}/html/settings/a_wireless_configuration.html".format(self._host)) | |
return next(iter(find_wireless(resp.text)), None) | |
@property | |
def connections(self): | |
resp = self._sess.get("{}/html/settings/a_internet.html".format(self._host)) | |
return find_connections(resp.text) | |
@property | |
def logs(self): | |
resp = self._sess.get("{}/html/troubles/event_log.html".format(self._host)) | |
html = BeautifulSoup(resp.text, "html.parser") | |
entries = [] | |
for row in html.find_all("tr"): | |
time = row.find("td", class_="logtime") | |
content = row.find("td", class_="logcontent") | |
if time and content: | |
entries.append(Entry(time.text.strip(), content.text.strip())) | |
return entries | |
@property | |
def version(self): | |
resp = self._sess.get("{}/html/troubles/helpdesk2.html".format(self._host)) | |
return next(iter(find_version(resp.text)), None) | |
if __name__ == "__main__": | |
from getpass import getpass | |
import os.path | |
import sys | |
from terminaltables import SingleTable | |
def table(cls, records, pos=None, strip=True, invert=None): | |
if isinstance(records, cls): | |
records = [records] | |
fields = cls._fields | |
if strip: | |
fields = fields[1:] | |
records = [record[1:] for record in records] | |
if invert is None: | |
invert = len(records) == 1 | |
title = "{}{}".format(cls.__name__, "" if len(records) == 1 else "s") | |
if invert: | |
if len(records) == 1: | |
header = ["field", "value"] | |
else: | |
header = ["field", *("value #{}".format(i + 1) for i in range(len(records)))] | |
data = [header, *zip(fields, *records)] | |
else: | |
data = [fields, *records] | |
if pos is not None: | |
title = "{} {}".format(title, pos + 1) | |
print(SingleTable(data, title).table) | |
if __name__ == "__main__": | |
try: | |
with open(os.path.expanduser("~/.hh3pwd")) as f: | |
pwd = f.read().strip() | |
except Exception: | |
pwd = getpass() | |
h = HH3(pwd) | |
h.login() | |
cmd = (sys.argv[1:2] or [None])[0] | |
if cmd == "ip": | |
print(next(filter(None, (c.addr for c in h.connections)), None)) | |
elif cmd == "logs": | |
table(Entry, h.logs, strip=False) | |
elif cmd == "wireless": | |
table(Wireless, h.wireless) | |
elif cmd == "conn": | |
table(Connection, h.connections, invert=True) | |
elif cmd == "devices": | |
table(Device, sorted(h.network.devices, key=lambda d: d.addr)) | |
else: | |
table(Version, h.version) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment