Skip to content

Instantly share code, notes, and snippets.

@ojpojao
Forked from Itay2805/cwmp.py
Created August 25, 2024 06:07
Show Gist options
  • Save ojpojao/41deb33ec83ea4d726aea92b65f45f9e to your computer and use it in GitHub Desktop.
Save ojpojao/41deb33ec83ea4d726aea92b65f45f9e to your computer and use it in GitHub Desktop.
A CWMP server to traverse and dump properties of a client
import dataclasses
import logging
import socket
import threading
import urllib.request
from typing import List
import inflection as inflection
from datetime import datetime
import coloredlogs
import xmltodict
import pprint
import queue
@dataclasses.dataclass
class DeviceIdStruct:
Manufacturer: str
OUI: str
ProductClass: str
SerialNumber: str
@dataclasses.dataclass
class EventStruct:
EventCode: str
CommandKey: str
@dataclasses.dataclass
class ParameterValueStruct:
Name: str
Value: ...
@dataclasses.dataclass
class ParameterInfoStruct:
Name: str
Writable: bool
@dataclasses.dataclass
class ParameterAttributeStruct:
Name: str
Notification: int
AccessList: List[str]
def get_as_string(body):
if isinstance(body, str):
return body
elif isinstance(body, dict):
if '#text' in body:
return body['#text']
else:
return ''
else:
return None
def parse_type(body, wanted_type=None):
if wanted_type is not None:
wanted_array = '[' in wanted_type
wanted_type = wanted_type.split('[')[0]
else:
wanted_array = False
if body is not None and '@xsi:type' in body:
typ = body['@xsi:type']
else:
assert wanted_type is not None, 'Value does not have an explicit type, please provide one'
if wanted_array:
typ = 'SOAP-ENC:Array'
else:
typ = wanted_type
wanted_type = None
# Check for array and verify if we wanted an array or not
is_array = False
count = None
if typ == 'SOAP-ENC:Array':
typ = body['@SOAP-ENC:arrayType']
typ, count = typ.split('[', 1)
assert count.endswith(']')
count = int(count[:-1])
is_array = True
if wanted_type is not None:
assert wanted_array, f'Wanted `{wanted_type}`, got array of `{typ}`'
else:
if wanted_type is not None:
assert not wanted_array, f'Wanted array of `{wanted_type}`, got `{typ}`'
# Verify the type
if wanted_type is not None:
assert wanted_type == typ, f'wanted `{wanted_type}`, got `{typ}`'
# Handle array type properly
if is_array:
typ_name = typ.split(':')[-1]
values = []
if typ_name in body:
lst = body[typ_name]
if isinstance(lst, list):
for value in lst:
values.append(parse_type(value, typ))
else:
values.append(parse_type(lst, typ))
assert len(values) == count, f'got more items than expected (got = {len(values)}, expected = {count})'
return values
# CWMP types
elif typ == 'cwmp:DeviceIdStruct':
return DeviceIdStruct(
Manufacturer=parse_type(body['Manufacturer'], wanted_type='xsd:string'),
OUI=parse_type(body['OUI'], wanted_type='xsd:string'),
ProductClass=parse_type(body['ProductClass'], wanted_type='xsd:string'),
SerialNumber=parse_type(body['SerialNumber'], wanted_type='xsd:string'),
)
elif typ == 'cwmp:ParameterInfoStruct':
return ParameterInfoStruct(
Name=parse_type(body['Name'], wanted_type='xsd:string'),
Writable=parse_type(body['Writable'], wanted_type='xsd:boolean')
)
elif typ == 'cwmp:ParameterAttributeStruct':
return ParameterAttributeStruct(
Name=parse_type(body['Name'], wanted_type='xsd:string'),
Notification=parse_type(body['Notification'], wanted_type='xsd:int'),
AccessList=parse_type(body['AccessList'], wanted_type='xsd:string[]')
)
elif typ == 'cwmp:EventStruct':
return EventStruct(
EventCode=parse_type(body['EventCode'], wanted_type='xsd:string'),
CommandKey=parse_type(body['CommandKey'], wanted_type='xsd:string')
)
elif typ == 'cwmp:ParameterValueStruct':
return ParameterValueStruct(
Name=parse_type(body['Name'], wanted_type='xsd:string'),
Value=parse_type(body['Value'])
)
# SOAP types
elif typ == 'xsd:string':
return get_as_string(body)
elif typ == 'xsd:unsignedInt':
return int(get_as_string(body))
elif typ == 'xsd:int':
return int(get_as_string(body))
elif typ == 'xsd:boolean':
return False if int(get_as_string(body)) == 0 else True
elif typ == 'xsd:dateTime':
return datetime.strptime(get_as_string(body), '%Y-%m-%dT%H:%M:%S')
else:
assert False, f'unknown type: `{typ}` (wanted `{wanted_type}`) -- {body}'
########################################################################################################################
REQUESTS = {
'cwmp:Inform': {
'DeviceId': 'cwmp:DeviceIdStruct',
'Event': 'cwmp:EventStruct[]',
'MaxEnvelopes': 'xsd:unsignedInt',
'CurrentTime': 'xsd:dateTime',
'RetryCount': 'xsd:unsignedInt',
'ParameterList': 'cwmp:ParameterValueStruct[]'
},
'cwmp:GetRPCMethodsResponse': {
'MethodList': 'xsd:string[]'
},
'cwmp:GetParameterValuesResponse': {
'ParameterList': 'cwmp:ParameterValueStruct[]'
},
'cwmp:GetParameterNamesResponse': {
'ParameterList': 'cwmp:ParameterInfoStruct[]'
},
'cwmp:GetParameterAttributesResponse': {
'ParameterList': 'cwmp:ParameterAttributeStruct[]'
},
'cwmp:DeleteObjectResponse': {
'Status': 'xsd:int'
}
}
def parse_soap(xml):
xml = xmltodict.parse(xml)
envelope = xml['SOAP-ENV:Envelope']
headers = envelope['SOAP-ENV:Header']
request_id = None
for header in headers:
value = headers[header]
if header == 'cwmp:ID':
request_id = value['#text']
else:
assert '@SOAP-ENV:mustUnderstand' not in value, f'unknown header key: `{header}`'
# Get the body nicely
body = envelope['SOAP-ENV:Body']
body = list(body.items())
assert len(body) == 1
body = body[0]
req = body[0]
body = body[1]
# Check the method is known
assert req in REQUESTS, f'Unknown request: `{req}`'
# Parse them
arguments = {}
for argument in body:
typ = REQUESTS[req][argument]
arguments[inflection.underscore(argument)] = parse_type(body[argument], wanted_type=typ)
# Return it all
return request_id, req, arguments
def build_soap(request_id, response_typ, response_arguments):
s = ''
s += '<?xml version="1.0" encoding="UTF-8" ?>\n'
s += '<SOAP-ENV:Envelope\n'
s += ' SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"\n'
s += ' xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"\n'
s += ' xmlns:SOAP-ENC="http://schemas.xmlsoap.org/soap/encoding/"\n'
s += ' xmlns:xsd="http://www.w3.org/2001/XMLSchema"\n'
s += ' xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\n'
s += ' xmlns:cwmp="urn:dslforum-org:cwmp-1-0"\n'
s += '>\n'
# Build the header
s += '\t<SOAP-ENV:Header>\n'
if request_id is not None:
s += f'\t\t<cwmp:ID SOAP-ENV:mustUnderstand="1">{request_id}</cwmp:ID>\n'
s += '\t</SOAP-ENV:Header>\n'
# Build the body
s += '\t<SOAP-ENV:Body>\n'
s += f'\t\t<{response_typ}>\n'
for arg in response_arguments:
value = response_arguments[arg]
if isinstance(value, str):
s += f'\t\t\t<{arg}>{value}</{arg}>\n'
elif isinstance(value, bool):
s += f'\t\t\t<{arg}>{"1" if value else "0"}</{arg}>\n'
elif isinstance(value, int):
s += f'\t\t\t<{arg}>{value}</{arg}>\n'
elif isinstance(value, list):
# TODO: not array of strings
typ = 'xsd:string'
typ_name = typ.split(':')[-1]
s += f'\t\t\t<{arg} SOAP-ENC:arrayType="{typ}[{len(value)}]">\n'
for item in value:
s += f'\t\t\t\t<{typ_name}>{item}</{typ_name}>\n'
s += f'\t\t\t</{arg}>\n'
else:
assert False, f"Unknown type in response `{type(value)}`"
s += f'\t\t</{response_typ}>\n'
s += '\t</SOAP-ENV:Body>\n'
s += '</SOAP-ENV:Envelope>'
return s
########################################################################################################################
def param_list_to_dict(param_list: List[ParameterValueStruct]):
d = {}
for param in param_list:
d[param.Name] = param.Value
return d
def handle_inform(
current_time: datetime,
device_id: DeviceIdStruct,
event: List[EventStruct],
max_envelopes: int,
parameter_list: List[ParameterValueStruct],
retry_count: int
):
assert len(event) == 1
return 'cwmp:InformResponse', {
'MaxEnvelopes': 1
}
########################################################################################################################
HTTP_DEFAULT_ENCODING = 'ISO-8859-1'
HTTP_RESPONSE_CODES = {
200: 'OK',
405: 'Method Not Allowed'
}
def send_http_response(c: socket.socket, status, encoding, headers=None, body=None):
# Send the status_line
c.send(f'HTTP/1.1 {200} {HTTP_RESPONSE_CODES[status]}\r\n'.encode(HTTP_DEFAULT_ENCODING))
# Setup the body to be bytes
if body is not None:
if isinstance(body, str):
body = body.encode(encoding)
else:
body = b''
if headers is None:
headers = {}
# Send the default headers
c.send('Connection: Keep-Alive\r\n'.encode(HTTP_DEFAULT_ENCODING))
c.send(f'Content-Length: {len(body)}\r\n'.encode(HTTP_DEFAULT_ENCODING))
c.send(f'Content-Type: text/xml; charset="{encoding}"\r\n'.encode(HTTP_DEFAULT_ENCODING))
# User headers
for header in headers:
c.send(f'{header}: {headers[header]}\r\n'.encode(HTTP_DEFAULT_ENCODING))
# Final CRLF
c.send('\r\n'.encode(HTTP_DEFAULT_ENCODING))
# Send the body
c.send(body)
def read_line(c: socket.socket, encoding=HTTP_DEFAULT_ENCODING):
line = bytearray()
got_cr = False
while True:
b = c.recv(1)
if len(b) == 0:
break
if got_cr:
if b == b'\n':
break
if b == b'\r':
got_cr = True
else:
got_cr = False
line.append(ord(b))
line = line.decode(encoding).strip()
return line
def read_http_request(c: socket.socket):
# Parse the start line
start_line = read_line(c)
if len(start_line) == 0:
return None, None, None, None, None
start_line = start_line.split(' ', 2)
http_method = start_line[0].strip()
request_target = start_line[1].strip()
http_version = start_line[2].strip()
assert http_version.lower() == 'http/1.1', http_version
# Read the headers properly,
headers = {}
while True:
line = read_line(c)
if line == '':
break
header, content = line.split(':', 1)
headers[header.lower()] = content.strip()
# Figure the encoding
encoding = HTTP_DEFAULT_ENCODING
if 'content-type' in headers:
for key in headers['content-type'].split(';'):
parts = key.strip().split('=')
if parts[0] == 'charset':
encoding = parts[1].strip()
# Read the body if any
body = None
if 'content-length' in headers:
content_length = int(headers['content-length'])
body = bytes()
while len(body) != content_length:
body += c.recv(content_length - len(body))
body = body.decode(encoding)
return http_method, request_target, headers, encoding, body
def start_server(command_queue: queue.Queue, response_queue: queue.Queue):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('0.0.0.0', 8087))
server.listen(1)
while True:
c, addr = server.accept()
c.setblocking(True)
expected_response = ''
while True:
http_method, request_target, headers, encoding, body = read_http_request(c)
if http_method is None:
break
# Only accept posts
if http_method != 'POST':
send_http_response(c, 405, encoding)
continue
# To the root
if request_target != '/':
continue
# This has a SOAP body
if 'soapaction' in headers:
request_id, req, arguments = parse_soap(body)
# Got an inform
if req == 'cwmp:Inform':
response_typ, response_arguments = handle_inform(**arguments)
response = build_soap(request_id, response_typ, response_arguments)
send_http_response(c, 200, encoding, {}, response)
continue
# Got the response we wanted to get
elif req == expected_response:
response_queue.put(arguments)
# if we got to here then we can send another request
action, arguments = command_queue.get()
expected_response = action + 'Response'
send_http_response(c, 200, encoding, {}, build_soap(None, action, arguments))
# Set up the server
command_queue = queue.Queue()
response_queue = queue.Queue()
thread = threading.Thread(daemon=True, target=start_server, args=(command_queue, response_queue))
thread.start()
# And now the "client"
def get_rpc_methods():
command_queue.put(('cwmp:GetRPCMethods', []))
return response_queue.get()['method_list']
def get_parameter_names(parameter_path: str, next_level=False):
command_queue.put(('cwmp:GetParameterNames', {
'ParameterPath': parameter_path,
'NextLevel': next_level
}))
return response_queue.get()['parameter_list']
def get_parameter_values(parameter_names: List[str]):
command_queue.put(('cwmp:GetParameterValues', {
'ParameterNames': parameter_names
}))
raw_params = response_queue.get()['parameter_list']
params = {}
for param in raw_params:
params[param.Name] = param.Value
return params
def delete_object(object_name: str, parameter_key: str):
command_queue.put(('cwmp:DeleteObject', {
'ObjectName': object_name,
'ParameterKey': parameter_key
}))
return response_queue.get()['status']
def print_tree(param_name, writable=False, last=True, header=''):
elbow = "└──"
pipe = "│ "
tee = "├──"
blank = " "
name = param_name.split('.')
if name[-1] == '':
name = name[-2]
else:
name = name[-1]
# start = header + (elbow if last else tee) + name + (' (writable)' if writable else '')
start = param_name + (' (writable)' if writable else '')
if param_name.endswith('.'):
print(start)
children = get_parameter_names(param_name, True)
for i, c in enumerate(children):
print_tree(c.Name, c.Writable, header=header + (blank if last else pipe), last=i == len(children) - 1)
else:
value = get_parameter_values([param_name])[param_name]
print(start + ': ' + repr(value))
# pprint.pp(get_rpc_methods())
# print_tree('InternetGatewayDevice.')
pprint.pp(delete_object('InternetGatewayDevice.LANDevice.1.Hosts.Host.1.', ''))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment