Created
May 21, 2019 04:15
-
-
Save Cr4sh/c1ca79087a95a01b8215e4230e28c53f to your computer and use it in GitHub Desktop.
Debug messages monitor for Qualcomm cellular modems
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
############################################################### | |
# | |
# Debug messages monitor for Qualcomm cellular modems. | |
# | |
# This program talks to the baseband firmware over the | |
# diag protocol serial port. | |
# | |
# Written by: | |
# Dmytro Oleksiuk (aka Cr4sh) | |
# | |
# [email protected] | |
# http://blog.cr4.sh | |
# | |
############################################################### | |
import sys, os, time, logging | |
from struct import pack, unpack | |
from threading import Thread | |
from serial import Serial | |
from optparse import OptionParser, make_option | |
SERIAL_BAUDRATE = 115200 | |
SERIAL_TIMEOUT = 0.1 | |
TIMESTAMP = '%d.%m.%y/%H:%M:%S' | |
class DispatcherThread(Thread): | |
def __init__(self, diag): | |
super(DispatcherThread, self).__init__() | |
self.diag = diag | |
self.daemon = True | |
def run(self): | |
self.diag.dispatch() | |
class DiagSerial(object): | |
def __init__(self, name, baudrate = None): | |
baudrate = SERIAL_BAUDRATE if baudrate is None else baudrate | |
self.port = Serial(name, baudrate = baudrate, timeout = SERIAL_TIMEOUT, | |
rtscts = True, dsrdtr = True) | |
def read(self, size): | |
return self.port.read(size) | |
def write(self, data): | |
self.port.write(data) | |
class DiagProt(DiagSerial): | |
CMD_MAX_SIZE = 0x1000 | |
CMD_TIMEOUT = 5 | |
VERNO_F = 0x00 | |
EXT_MSG_CONFIG_F = 0x7d | |
EXT_MSG_F = 0x79 | |
EXT_MSG_TERSE_F = 0x92 | |
EXT_MSG_SUBCMD_SET_RT_MASK = 0x04 | |
MAX_SSID = 0xcfff | |
crc_table = [ | |
0x0000, 0x1189, 0x2312, 0x329b, 0x4624, 0x57ad, 0x6536, 0x74bf, | |
0x8c48, 0x9dc1, 0xaf5a, 0xbed3, 0xca6c, 0xdbe5, 0xe97e, 0xf8f7, | |
0x1081, 0x0108, 0x3393, 0x221a, 0x56a5, 0x472c, 0x75b7, 0x643e, | |
0x9cc9, 0x8d40, 0xbfdb, 0xae52, 0xdaed, 0xcb64, 0xf9ff, 0xe876, | |
0x2102, 0x308b, 0x0210, 0x1399, 0x6726, 0x76af, 0x4434, 0x55bd, | |
0xad4a, 0xbcc3, 0x8e58, 0x9fd1, 0xeb6e, 0xfae7, 0xc87c, 0xd9f5, | |
0x3183, 0x200a, 0x1291, 0x0318, 0x77a7, 0x662e, 0x54b5, 0x453c, | |
0xbdcb, 0xac42, 0x9ed9, 0x8f50, 0xfbef, 0xea66, 0xd8fd, 0xc974, | |
0x4204, 0x538d, 0x6116, 0x709f, 0x0420, 0x15a9, 0x2732, 0x36bb, | |
0xce4c, 0xdfc5, 0xed5e, 0xfcd7, 0x8868, 0x99e1, 0xab7a, 0xbaf3, | |
0x5285, 0x430c, 0x7197, 0x601e, 0x14a1, 0x0528, 0x37b3, 0x263a, | |
0xdecd, 0xcf44, 0xfddf, 0xec56, 0x98e9, 0x8960, 0xbbfb, 0xaa72, | |
0x6306, 0x728f, 0x4014, 0x519d, 0x2522, 0x34ab, 0x0630, 0x17b9, | |
0xef4e, 0xfec7, 0xcc5c, 0xddd5, 0xa96a, 0xb8e3, 0x8a78, 0x9bf1, | |
0x7387, 0x620e, 0x5095, 0x411c, 0x35a3, 0x242a, 0x16b1, 0x0738, | |
0xffcf, 0xee46, 0xdcdd, 0xcd54, 0xb9eb, 0xa862, 0x9af9, 0x8b70, | |
0x8408, 0x9581, 0xa71a, 0xb693, 0xc22c, 0xd3a5, 0xe13e, 0xf0b7, | |
0x0840, 0x19c9, 0x2b52, 0x3adb, 0x4e64, 0x5fed, 0x6d76, 0x7cff, | |
0x9489, 0x8500, 0xb79b, 0xa612, 0xd2ad, 0xc324, 0xf1bf, 0xe036, | |
0x18c1, 0x0948, 0x3bd3, 0x2a5a, 0x5ee5, 0x4f6c, 0x7df7, 0x6c7e, | |
0xa50a, 0xb483, 0x8618, 0x9791, 0xe32e, 0xf2a7, 0xc03c, 0xd1b5, | |
0x2942, 0x38cb, 0x0a50, 0x1bd9, 0x6f66, 0x7eef, 0x4c74, 0x5dfd, | |
0xb58b, 0xa402, 0x9699, 0x8710, 0xf3af, 0xe226, 0xd0bd, 0xc134, | |
0x39c3, 0x284a, 0x1ad1, 0x0b58, 0x7fe7, 0x6e6e, 0x5cf5, 0x4d7c, | |
0xc60c, 0xd785, 0xe51e, 0xf497, 0x8028, 0x91a1, 0xa33a, 0xb2b3, | |
0x4a44, 0x5bcd, 0x6956, 0x78df, 0x0c60, 0x1de9, 0x2f72, 0x3efb, | |
0xd68d, 0xc704, 0xf59f, 0xe416, 0x90a9, 0x8120, 0xb3bb, 0xa232, | |
0x5ac5, 0x4b4c, 0x79d7, 0x685e, 0x1ce1, 0x0d68, 0x3ff3, 0x2e7a, | |
0xe70e, 0xf687, 0xc41c, 0xd595, 0xa12a, 0xb0a3, 0x8238, 0x93b1, | |
0x6b46, 0x7acf, 0x4854, 0x59dd, 0x2d62, 0x3ceb, 0x0e70, 0x1ff9, | |
0xf78f, 0xe606, 0xd49d, 0xc514, 0xb1ab, 0xa022, 0x92b9, 0x8330, | |
0x7bc7, 0x6a4e, 0x58d5, 0x495c, 0x3de3, 0x2c6a, 0x1ef1, 0x0f78 ] | |
def __init__(self, device_name, output_file = None, baudrate = None): | |
self.running = True | |
self.initialized = False | |
self.log_init(output_file) | |
self.logger.info('[+] Opening %s' % device_name) | |
super(DiagProt, self).__init__(device_name, baudrate = baudrate) | |
def log_init(self, output_file = None): | |
self.logger = logging.getLogger(__name__) | |
if output_file: | |
self.logger.addHandler(logging.FileHandler(output_file)) | |
self.logger.addHandler(logging.StreamHandler(sys.stdout)) | |
self.logger.setLevel(logging.DEBUG) | |
def crc_16(self, data): | |
ret = 0xffff | |
for b in data: | |
ret = (ret >> 8) ^ self.crc_table[(ret ^ ord(b)) & 0xff] | |
return ret ^ 0xffff | |
def wrap(self, data): | |
data = data.replace('\x7d', '\x7d\x5d') | |
data = data.replace('\x7e', '\x7d\x5e') | |
return data | |
def unwrap(self, data): | |
data = data.replace('\x7d\x5e', '\x7e') | |
data = data.replace('\x7d\x5d', '\x7d') | |
return data | |
def make_packet(self, data): | |
data = data + pack('<H', self.crc_16(data)) | |
data = self.wrap(data) + '\x7e' | |
return data | |
def write(self, cmd, sub_cmd = None, data = ''): | |
hdr = pack('<BB', cmd, sub_cmd) if sub_cmd else pack('<B', cmd) | |
super(DiagProt, self).write(self.make_packet(hdr + data)) | |
def read(self): | |
return super(DiagProt, self).read(self.CMD_MAX_SIZE) | |
def send_verno(self): | |
cnt = 0 | |
self.model, self.revision = None, None | |
# send request | |
self.write(self.VERNO_F) | |
# wait for the reply | |
while self.model is None and self.revision is None: | |
if cnt >= self.CMD_TIMEOUT: | |
raise(Exception('VERNO_F reply timeout')) | |
time.sleep(1) | |
cnt += 1 | |
self.logger.info('[+] Device model: %s' % self.model) | |
self.logger.info('[+] Revision: %d' % self.revision) | |
return self.model, self.revision | |
def recv_verno(self, data): | |
# parse VERNO_F header | |
comp_date, comp_time, rel_date, rel_time, \ | |
model, scm, mob_cai_rev, mob_model, \ | |
mob_firmware_rev, slot_cycle_index, \ | |
msm_ver, _ = unpack('<11s8s11s8s8sBBBHBBB', data) | |
self.model, self.revision = model, mob_firmware_rev | |
def send_msg_config_set_rt_mask(self, ssid_start, ssid_end, on = True): | |
data = '' | |
# create masks list for specified subsystems | |
for i in range(0, ssid_end - ssid_start + 1): | |
data += pack('<I', 0xffffffff if on else 0) | |
# send request | |
self.write(self.EXT_MSG_CONFIG_F, self.EXT_MSG_SUBCMD_SET_RT_MASK, | |
pack('<HHH', ssid_start, ssid_end, 0) + data) | |
def recv_msg_config_set_rt_mask(self, data): | |
pass | |
def recv_msg(self, data): | |
args_list = [] | |
if not self.initialized: return | |
# parse EXT_MSG_F header | |
_, num_args, _, timestamp, line, ssid, _ = unpack('<BBBQHHL', data[: 19]) | |
args = data[19 :] | |
message = args[(num_args * 4) :] | |
message, file_name, _ = message.split('\0') | |
for i in range(0, num_args): | |
# get argument value | |
args_list.append(unpack('<I', args[(i * 4) : (i * 4) + 4])[0]) | |
message = message.strip() | |
file_name = file_name.strip() | |
try: | |
# process message | |
self.handle_message(ssid, file_name, line, message % tuple(args_list)) | |
except TypeError: | |
self.logger.error('[!] Format string error for message "%s" (%d args)' % (message, num_args)) | |
def recv_msg_terse(self, data): | |
pass | |
def dispatch(self): | |
old_data = '' | |
while True: | |
# read data stream | |
data = self.read() | |
if data is None: continue | |
data = old_data + data | |
data = data.split('\x7e') | |
old_data = data.pop() | |
for packet in data: | |
if len(packet) >= 3: | |
# parse individual packet | |
self.dispatch_packet(self.unwrap(packet)) | |
def dispatch_packet(self, packet): | |
# parse packet header | |
data = packet[1 : -2] | |
cmd = ord(packet[0]) | |
crc, = unpack('<H', packet[-2 :]) | |
# verify packet control sum | |
if self.crc_16(packet[: -2]) != crc: | |
self.logger.error('[!] Bad diag packet control sum') | |
self.running = False | |
else: | |
self.handle_packet(cmd, data) | |
def handle_packet(self, cmd, data): | |
try: | |
# | |
# Parse packet of specific type | |
# | |
{ self.VERNO_F: self.recv_verno, | |
self.EXT_MSG_CONFIG_F: self.recv_msg_config_set_rt_mask, | |
self.EXT_MSG_F: self.recv_msg, | |
self.EXT_MSG_TERSE_F: self.recv_msg_terse }[cmd](data) | |
except KeyError: | |
self.logger.error('[!] Unknown command: 0x%.2x' % cmd) | |
except: | |
self.running = False | |
raise | |
def handle_message(self, ssid, file_name, line, message): | |
pass | |
class DiagSubsystemsFinder(DiagProt): | |
def __init__(self, device_name, output_file = None, baudrate = None): | |
self.known_subsystems = [] | |
self.ssid_start = self.ssid_end = None | |
super(DiagSubsystemsFinder, self).__init__(device_name, output_file = output_file, | |
baudrate = baudrate) | |
def recv_msg_config_set_rt_mask(self, data): | |
# parse EXT_MSG_CONFIG_F header | |
_, _, ssid, count = unpack('<BHHH', data[: 7]) | |
if count > 0: | |
# valid subsystem ID was found | |
if self.ssid_end is not None: | |
if self.ssid_end + 1 != ssid: | |
print('( 0x%.4x, 0x%.4x )' % (self.ssid_start, self.ssid_end)) | |
self.known_subsystems.append((self.ssid_start, self.ssid_end)) | |
self.ssid_start = self.ssid_end = ssid | |
else: self.ssid_end += 1 | |
else: self.ssid_start = self.ssid_end = ssid | |
def start(self): | |
DispatcherThread(self).start() | |
# verify diag connection | |
self.send_verno() | |
self.logger.info('[+] Finding all available subsystem IDs, it may take a while...\n') | |
subsys = 0 | |
# enumerate and test all spossible ubsystems | |
while subsys < self.MAX_SSID: | |
self.send_msg_config_set_rt_mask(subsys, subsys) | |
subsys += 1 | |
self.logger.info('\n[+] DONE') | |
class DiagLogger(DiagProt): | |
known_subsystems = [ | |
( 0x0000, 0x00c8 ), ( 0x01f4, 0x02bc ), | |
( 0x03e8, 0x04b0 ), ( 0x07d0, 0x0898 ), | |
( 0x0bb8, 0x0c80 ), ( 0x0fa0, 0x1068 ), | |
( 0x1194, 0x12c0 ), ( 0x1388, 0x1450 ), | |
( 0x157c, 0x1644 ), ( 0x1770, 0x1838 ), | |
( 0x1964, 0x1a2c ), ( 0x1b58, 0x1ce8 ), | |
( 0x1f40, 0x2008 ), ( 0x2134, 0x21fc ), | |
( 0x2328, 0x23f0 ), ( 0x251c, 0x25e4 ), | |
( 0x27d8, 0x2968 ) ] | |
def handle_message(self, ssid, file_name, line, message): | |
if TIMESTAMP is not None: | |
timestamp = '[%s] ' % time.strftime(TIMESTAMP, time.localtime(time.time())) | |
else: | |
timestamp = '' | |
self.logger.debug('%s[%.4x] %s(%d) : %s' % (timestamp, ssid, file_name, line, message)) | |
def start(self): | |
DispatcherThread(self).start() | |
# verify diag connection | |
self.send_verno() | |
for ssid_start, ssid_end in self.known_subsystems: | |
# enable messages for specific subsystem | |
self.send_msg_config_set_rt_mask(ssid_start, ssid_end) | |
self.logger.info('') | |
self.initialized = True | |
def main(): | |
option_list = [ | |
make_option('-d', '--device-name', dest = 'device_name', default = None, | |
help = 'device name of diag interface serial port'), | |
make_option('-b', '--baudrate', dest = 'baudrate', default = None, | |
help = 'serial port speed', metavar = 'NUMBER', type = 'int'), | |
make_option('-o', '--output-file', dest = 'output_file', default = None, | |
help = 'output file to write debug messages'), | |
make_option('-f', '--find-subsystems', dest = 'find_subsystems', default = False, | |
help = 'find all valid subsystem IDs for this device', action = 'store_true') ] | |
options, _ = OptionParser(option_list = option_list).parse_args() | |
if options.device_name is None: | |
print('ERROR: Device name is not specified') | |
return 0 | |
if not options.find_subsystems: | |
diag = DiagLogger(options.device_name, output_file = options.output_file, | |
baudrate = options.baudrate) | |
diag.start() | |
try: | |
while diag.running: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
print('\nEXIT\n') | |
else: | |
diag = DiagSubsystemsFinder(options.device_name, output_file = options.output_file, | |
baudrate = options.baudrate) | |
diag.start() | |
return 0 | |
if __name__ == '__main__': | |
exit(main()) | |
# | |
# EoF | |
# |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment