-
-
Save ojpojao/41deb33ec83ea4d726aea92b65f45f9e to your computer and use it in GitHub Desktop.
A CWMP server to traverse and dump properties of a client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dataclasses | |
import logging | |
import socket | |
import threading | |
import urllib.request | |
from typing import List | |
import inflection as inflection | |
from datetime import datetime | |
import coloredlogs | |
import xmltodict | |
import pprint | |
import queue | |
@dataclasses.dataclass | |
class DeviceIdStruct: | |
Manufacturer: str | |
OUI: str | |
ProductClass: str | |
SerialNumber: str | |
@dataclasses.dataclass | |
class EventStruct: | |
EventCode: str | |
CommandKey: str | |
@dataclasses.dataclass | |
class ParameterValueStruct: | |
Name: str | |
Value: ... | |
@dataclasses.dataclass | |
class ParameterInfoStruct: | |
Name: str | |
Writable: bool | |
@dataclasses.dataclass | |
class ParameterAttributeStruct: | |
Name: str | |
Notification: int | |
AccessList: List[str] | |
def get_as_string(body): | |
if isinstance(body, str): | |
return body | |
elif isinstance(body, dict): | |
if '#text' in body: | |
return body['#text'] | |
else: | |
return '' | |
else: | |
return None | |
def parse_type(body, wanted_type=None): | |
if wanted_type is not None: | |
wanted_array = '[' in wanted_type | |
wanted_type = wanted_type.split('[')[0] | |
else: | |
wanted_array = False | |
if body is not None and '@xsi:type' in body: | |
typ = body['@xsi:type'] | |
else: | |
assert wanted_type is not None, 'Value does not have an explicit type, please provide one' | |
if wanted_array: | |
typ = 'SOAP-ENC:Array' | |
else: | |
typ = wanted_type | |
wanted_type = None | |
# Check for array and verify if we wanted an array or not | |
is_array = False | |
count = None | |
if typ == 'SOAP-ENC:Array': | |
typ = body['@SOAP-ENC:arrayType'] | |
typ, count = typ.split('[', 1) | |
assert count.endswith(']') | |
count = int(count[:-1]) | |
is_array = True | |
if wanted_type is not None: | |
assert wanted_array, f'Wanted `{wanted_type}`, got array of `{typ}`' | |
else: | |
if wanted_type is not None: | |
assert not wanted_array, f'Wanted array of `{wanted_type}`, got `{typ}`' | |
# Verify the type | |
if wanted_type is not None: | |
assert wanted_type == typ, f'wanted `{wanted_type}`, got `{typ}`' | |
# Handle array type properly | |
if is_array: | |
typ_name = typ.split(':')[-1] | |
values = [] | |
if typ_name in body: | |
lst = body[typ_name] | |
if isinstance(lst, list): | |
for value in lst: | |
values.append(parse_type(value, typ)) | |
else: | |
values.append(parse_type(lst, typ)) | |
assert len(values) == count, f'got more items than expected (got = {len(values)}, expected = {count})' | |
return values | |
# CWMP types | |
elif typ == 'cwmp:DeviceIdStruct': | |
return DeviceIdStruct( | |
Manufacturer=parse_type(body['Manufacturer'], wanted_type='xsd:string'), | |
OUI=parse_type(body['OUI'], wanted_type='xsd:string'), | |
ProductClass=parse_type(body['ProductClass'], wanted_type='xsd:string'), | |
SerialNumber=parse_type(body['SerialNumber'], wanted_type='xsd:string'), | |
) | |
elif typ == 'cwmp:ParameterInfoStruct': | |
return ParameterInfoStruct( | |
Name=parse_type(body['Name'], wanted_type='xsd:string'), | |
Writable=parse_type(body['Writable'], wanted_type='xsd:boolean') | |
) | |
elif typ == 'cwmp:ParameterAttributeStruct': | |
return ParameterAttributeStruct( | |
Name=parse_type(body['Name'], wanted_type='xsd:string'), | |
Notification=parse_type(body['Notification'], wanted_type='xsd:int'), | |
AccessList=parse_type(body['AccessList'], wanted_type='xsd:string[]') | |
) | |
elif typ == 'cwmp:EventStruct': | |
return EventStruct( | |
EventCode=parse_type(body['EventCode'], wanted_type='xsd:string'), | |
CommandKey=parse_type(body['CommandKey'], wanted_type='xsd:string') | |
) | |
elif typ == 'cwmp:ParameterValueStruct': | |
return ParameterValueStruct( | |
Name=parse_type(body['Name'], wanted_type='xsd:string'), | |
Value=parse_type(body['Value']) | |
) | |
# SOAP types | |
elif typ == 'xsd:string': | |
return get_as_string(body) | |
elif typ == 'xsd:unsignedInt': | |
return int(get_as_string(body)) | |
elif typ == 'xsd:int': | |
return int(get_as_string(body)) | |
elif typ == 'xsd:boolean': | |
return False if int(get_as_string(body)) == 0 else True | |
elif typ == 'xsd:dateTime': | |
return datetime.strptime(get_as_string(body), '%Y-%m-%dT%H:%M:%S') | |
else: | |
assert False, f'unknown type: `{typ}` (wanted `{wanted_type}`) -- {body}' | |
######################################################################################################################## | |
REQUESTS = { | |
'cwmp:Inform': { | |
'DeviceId': 'cwmp:DeviceIdStruct', | |
'Event': 'cwmp:EventStruct[]', | |
'MaxEnvelopes': 'xsd:unsignedInt', | |
'CurrentTime': 'xsd:dateTime', | |
'RetryCount': 'xsd:unsignedInt', | |
'ParameterList': 'cwmp:ParameterValueStruct[]' | |
}, | |
'cwmp:GetRPCMethodsResponse': { | |
'MethodList': 'xsd:string[]' | |
}, | |
'cwmp:GetParameterValuesResponse': { | |
'ParameterList': 'cwmp:ParameterValueStruct[]' | |
}, | |
'cwmp:GetParameterNamesResponse': { | |
'ParameterList': 'cwmp:ParameterInfoStruct[]' | |
}, | |
'cwmp:GetParameterAttributesResponse': { | |
'ParameterList': 'cwmp:ParameterAttributeStruct[]' | |
}, | |
'cwmp:DeleteObjectResponse': { | |
'Status': 'xsd:int' | |
} | |
} | |
def parse_soap(xml): | |
xml = xmltodict.parse(xml) | |
envelope = xml['SOAP-ENV:Envelope'] | |
headers = envelope['SOAP-ENV:Header'] | |
request_id = None | |
for header in headers: | |
value = headers[header] | |
if header == 'cwmp:ID': | |
request_id = value['#text'] | |
else: | |
assert '@SOAP-ENV:mustUnderstand' not in value, f'unknown header key: `{header}`' | |
# Get the body nicely | |
body = envelope['SOAP-ENV:Body'] | |
body = list(body.items()) | |
assert len(body) == 1 | |
body = body[0] | |
req = body[0] | |
body = body[1] | |
# Check the method is known | |
assert req in REQUESTS, f'Unknown request: `{req}`' | |
# Parse them | |
arguments = {} | |
for argument in body: | |
typ = REQUESTS[req][argument] | |
arguments[inflection.underscore(argument)] = parse_type(body[argument], wanted_type=typ) | |
# Return it all | |
return request_id, req, arguments | |
def build_soap(request_id, response_typ, response_arguments): | |
s = '' | |
s += '<?xml version="1.0" encoding="UTF-8" ?>\n' | |
s += '<SOAP-ENV:Envelope\n' | |
s += ' SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"\n' | |
s += ' xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"\n' | |
s += ' xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/"\n' | |
s += ' xmlns:xsd="http://www.w3.org/2001/XMLSchema"\n' | |
s += ' xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\n' | |
s += ' xmlns:cwmp="urn:dslforum-org:cwmp-1-0"\n' | |
s += '>\n' | |
# Build the header | |
s += '\t<SOAP-ENV:Header>\n' | |
if request_id is not None: | |
s += f'\t\t<cwmp:ID SOAP-ENV:mustUnderstand="1">{request_id}</cwmp:ID>\n' | |
s += '\t</SOAP-ENV:Header>\n' | |
# Build the body | |
s += '\t<SOAP-ENV:Body>\n' | |
s += f'\t\t<{response_typ}>\n' | |
for arg in response_arguments: | |
value = response_arguments[arg] | |
if isinstance(value, str): | |
s += f'\t\t\t<{arg}>{value}</{arg}>\n' | |
elif isinstance(value, bool): | |
s += f'\t\t\t<{arg}>{"1" if value else "0"}</{arg}>\n' | |
elif isinstance(value, int): | |
s += f'\t\t\t<{arg}>{value}</{arg}>\n' | |
elif isinstance(value, list): | |
# TODO: not array of strings | |
typ = 'xsd:string' | |
typ_name = typ.split(':')[-1] | |
s += f'\t\t\t<{arg} SOAP-ENC:arrayType="{typ}[{len(value)}]">\n' | |
for item in value: | |
s += f'\t\t\t\t<{typ_name}>{item}</{typ_name}>\n' | |
s += f'\t\t\t</{arg}>\n' | |
else: | |
assert False, f"Unknown type in response `{type(value)}`" | |
s += f'\t\t</{response_typ}>\n' | |
s += '\t</SOAP-ENV:Body>\n' | |
s += '</SOAP-ENV:Envelope>' | |
return s | |
######################################################################################################################## | |
def param_list_to_dict(param_list: List[ParameterValueStruct]): | |
d = {} | |
for param in param_list: | |
d[param.Name] = param.Value | |
return d | |
def handle_inform( | |
current_time: datetime, | |
device_id: DeviceIdStruct, | |
event: List[EventStruct], | |
max_envelopes: int, | |
parameter_list: List[ParameterValueStruct], | |
retry_count: int | |
): | |
assert len(event) == 1 | |
return 'cwmp:InformResponse', { | |
'MaxEnvelopes': 1 | |
} | |
######################################################################################################################## | |
HTTP_DEFAULT_ENCODING = 'ISO-8859-1' | |
HTTP_RESPONSE_CODES = { | |
200: 'OK', | |
405: 'Method Not Allowed' | |
} | |
def send_http_response(c: socket.socket, status, encoding, headers=None, body=None): | |
# Send the status_line | |
c.send(f'HTTP/1.1 {200} {HTTP_RESPONSE_CODES[status]}\r\n'.encode(HTTP_DEFAULT_ENCODING)) | |
# Setup the body to be bytes | |
if body is not None: | |
if isinstance(body, str): | |
body = body.encode(encoding) | |
else: | |
body = b'' | |
if headers is None: | |
headers = {} | |
# Send the default headers | |
c.send('Connection: Keep-Alive\r\n'.encode(HTTP_DEFAULT_ENCODING)) | |
c.send(f'Content-Length: {len(body)}\r\n'.encode(HTTP_DEFAULT_ENCODING)) | |
c.send(f'Content-Type: text/xml; charset="{encoding}"\r\n'.encode(HTTP_DEFAULT_ENCODING)) | |
# User headers | |
for header in headers: | |
c.send(f'{header}: {headers[header]}\r\n'.encode(HTTP_DEFAULT_ENCODING)) | |
# Final CRLF | |
c.send('\r\n'.encode(HTTP_DEFAULT_ENCODING)) | |
# Send the body | |
c.send(body) | |
def read_line(c: socket.socket, encoding=HTTP_DEFAULT_ENCODING): | |
line = bytearray() | |
got_cr = False | |
while True: | |
b = c.recv(1) | |
if len(b) == 0: | |
break | |
if got_cr: | |
if b == b'\n': | |
break | |
if b == b'\r': | |
got_cr = True | |
else: | |
got_cr = False | |
line.append(ord(b)) | |
line = line.decode(encoding).strip() | |
return line | |
def read_http_request(c: socket.socket): | |
# Parse the start line | |
start_line = read_line(c) | |
if len(start_line) == 0: | |
return None, None, None, None, None | |
start_line = start_line.split(' ', 2) | |
http_method = start_line[0].strip() | |
request_target = start_line[1].strip() | |
http_version = start_line[2].strip() | |
assert http_version.lower() == 'http/1.1', http_version | |
# Read the headers properly, | |
headers = {} | |
while True: | |
line = read_line(c) | |
if line == '': | |
break | |
header, content = line.split(':', 1) | |
headers[header.lower()] = content.strip() | |
# Figure the encoding | |
encoding = HTTP_DEFAULT_ENCODING | |
if 'content-type' in headers: | |
for key in headers['content-type'].split(';'): | |
parts = key.strip().split('=') | |
if parts[0] == 'charset': | |
encoding = parts[1].strip() | |
# Read the body if any | |
body = None | |
if 'content-length' in headers: | |
content_length = int(headers['content-length']) | |
body = bytes() | |
while len(body) != content_length: | |
body += c.recv(content_length - len(body)) | |
body = body.decode(encoding) | |
return http_method, request_target, headers, encoding, body | |
def start_server(command_queue: queue.Queue, response_queue: queue.Queue): | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) | |
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
server.bind(('0.0.0.0', 8087)) | |
server.listen(1) | |
while True: | |
c, addr = server.accept() | |
c.setblocking(True) | |
expected_response = '' | |
while True: | |
http_method, request_target, headers, encoding, body = read_http_request(c) | |
if http_method is None: | |
break | |
# Only accept posts | |
if http_method != 'POST': | |
send_http_response(c, 405, encoding) | |
continue | |
# To the root | |
if request_target != '/': | |
continue | |
# This has a SOAP body | |
if 'soapaction' in headers: | |
request_id, req, arguments = parse_soap(body) | |
# Got an inform | |
if req == 'cwmp:Inform': | |
response_typ, response_arguments = handle_inform(**arguments) | |
response = build_soap(request_id, response_typ, response_arguments) | |
send_http_response(c, 200, encoding, {}, response) | |
continue | |
# Got the response we wanted to get | |
elif req == expected_response: | |
response_queue.put(arguments) | |
# if we got to here then we can send another request | |
action, arguments = command_queue.get() | |
expected_response = action + 'Response' | |
send_http_response(c, 200, encoding, {}, build_soap(None, action, arguments)) | |
# Set up the server | |
command_queue = queue.Queue() | |
response_queue = queue.Queue() | |
thread = threading.Thread(daemon=True, target=start_server, args=(command_queue, response_queue)) | |
thread.start() | |
# And now the "client" | |
def get_rpc_methods(): | |
command_queue.put(('cwmp:GetRPCMethods', [])) | |
return response_queue.get()['method_list'] | |
def get_parameter_names(parameter_path: str, next_level=False): | |
command_queue.put(('cwmp:GetParameterNames', { | |
'ParameterPath': parameter_path, | |
'NextLevel': next_level | |
})) | |
return response_queue.get()['parameter_list'] | |
def get_parameter_values(parameter_names: List[str]): | |
command_queue.put(('cwmp:GetParameterValues', { | |
'ParameterNames': parameter_names | |
})) | |
raw_params = response_queue.get()['parameter_list'] | |
params = {} | |
for param in raw_params: | |
params[param.Name] = param.Value | |
return params | |
def delete_object(object_name: str, parameter_key: str): | |
command_queue.put(('cwmp:DeleteObject', { | |
'ObjectName': object_name, | |
'ParameterKey': parameter_key | |
})) | |
return response_queue.get()['status'] | |
def print_tree(param_name, writable=False, last=True, header=''): | |
elbow = "└──" | |
pipe = "│ " | |
tee = "├──" | |
blank = " " | |
name = param_name.split('.') | |
if name[-1] == '': | |
name = name[-2] | |
else: | |
name = name[-1] | |
# start = header + (elbow if last else tee) + name + (' (writable)' if writable else '') | |
start = param_name + (' (writable)' if writable else '') | |
if param_name.endswith('.'): | |
print(start) | |
children = get_parameter_names(param_name, True) | |
for i, c in enumerate(children): | |
print_tree(c.Name, c.Writable, header=header + (blank if last else pipe), last=i == len(children) - 1) | |
else: | |
value = get_parameter_values([param_name])[param_name] | |
print(start + ': ' + repr(value)) | |
# pprint.pp(get_rpc_methods()) | |
# print_tree('InternetGatewayDevice.') | |
pprint.pp(delete_object('InternetGatewayDevice.LANDevice.1.Hosts.Host.1.', '')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment