Skip to content

Instantly share code, notes, and snippets.

@sjmf
Last active May 3, 2026 18:48
Show Gist options
  • Select an option

  • Save sjmf/c4329fd27e403a264648bf4e7744655a to your computer and use it in GitHub Desktop.

Select an option

Save sjmf/c4329fd27e403a264648bf4e7744655a to your computer and use it in GitHub Desktop.
CH9350L lower-computer protocol PoC — kvm-serial issue #13
#!/usr/bin/env python3
"""
CH9350L UART protocol PoC — issue #13.
Two subcommands:
sniff PORT passively decode frames on a serial line. Tag with --tag
LC/UC for two-port bidirectional captures (merge logs by
timestamp afterwards).
emulate PORT drive a real CH9350L upper computer from this host: send
the attach sequence (0x86 / 0x80 / 0x89 / 0x81 ×N), then
forward keystrokes and mouse motion typed at the prompt.
Frame formats and the wire-level state machine are documented in
docs/CH9350L_PROTO.md on the feature/13-ch9350L-support branch.
Requires pyserial.
"""
import time
import threading
import argparse
import serial
HEADER = b"\x57\xAB"
# Captured HID Report Descriptors from a real CH9350L LC at attach time
# (sniff_lc.txt, 2026-05-03). These are what the LC announces over 0x81 to
# the UC so it can present matching HID descriptors to the target PC.
DEFAULT_MOUSE_DESC = bytes.fromhex(
"05010902a10185010901a1000509190129031500250175019503810275059501"
"810105010930093109381581257f750895038106c005ff09021500250175019501"
"b12275079501b101c0"
)
DEFAULT_MOUSE_PID = bytes.fromhex("4000") # id2 / port-1 PID
DEFAULT_KBD_DESC = bytes.fromhex(
"05010906a1018501050719e029e7150025017501950881029501750881019503"
"750105081901290391029505750191019506750826ff000507190029918100c0"
"05010980a10185021981298315002501950375018102950175058101c0050c09"
"01a10185031500250109e909ea09e209cd19b529b87501950881020a8a010a21"
"020a2a021a23022a270281020a83010a96010a92010a9e010a94010a060209b2"
"09b48102c0"
)
DEFAULT_KBD_PID = bytes.fromhex("0315") # id2 / port-2 PID
# ---------------------------------------------------------------------------
# Protocol constants
# ---------------------------------------------------------------------------
# Known fixed payload lengths (bytes after the cmd byte).
# 0x83/0x88 are length-prefixed (LEN byte at payload[0]) — handled separately.
PAYLOAD_LEN = {
0x82: 1, # Heartbeat: one status byte (0xA0..0xAF)
0x12: 8, # Upper-computer init/LED frame: 8 bytes after cmd
0x01: 8, # Keyboard state-2: 8-byte USB HID boot report
0x02: 4, # Mouse relative state-2: btn, dx, dy, wheel
0x04: 7, # Mouse absolute state-3/4: id, btn, xl, xh, yl, yh, wheel
0x80: 1, # Startup: 1-byte status (observed at USB device attach)
0x86: 0, # Startup: attach signal (no payload)
0x89: 0, # Periodic LC status announce (no payload)
}
# LC → UC heartbeat. Low nibble = IO pin state; a real LC sends 0xA3
# (IO0/IO1 high). 0xA0 also accepted by the UC.
HEARTBEAT_FRAME = bytes([0x57, 0xAB, 0x82, 0xA3])
# ---------------------------------------------------------------------------
# HID usage ID names (index = usage ID)
# ---------------------------------------------------------------------------
_HID_USAGE = (
"NONE ERR_OVF POST_FAIL ERR_UNDEF "
"A B C D E F G H I J K L M N O P Q R S T U V W X Y Z "
"1 2 3 4 5 6 7 8 9 0 "
"ENTER ESC BSPC TAB SPACE - = [ ] \\ # ; ' ` , . / "
"CAPS F1 F2 F3 F4 F5 F6 F7 F8 F9 F10 F11 F12 "
"PRTSC SCRLK PAUSE INS HOME PGUP DEL END PGDN "
"RIGHT LEFT DOWN UP "
"NUMLK KP/ KP* KP- KP+ KPENT KP1 KP2 KP3 KP4 KP5 KP6 KP7 KP8 KP9 KP0 KP."
).split()
MODS = {
0x01: "LCTRL", 0x02: "LSHIFT", 0x04: "LALT", 0x08: "LWIN",
0x10: "RCTRL", 0x20: "RSHIFT", 0x40: "RALT", 0x80: "RWIN",
}
def fmt_mod(b: int) -> str:
return "+".join(n for m, n in MODS.items() if b & m) or "-"
def hid_name(code: int) -> str:
if code < len(_HID_USAGE):
return _HID_USAGE[code]
return f"0x{code:02X}"
def fmt_hid(report: bytes) -> str:
"""Format an 8-byte USB HID boot keyboard report."""
if len(report) < 8:
return f"<short {report.hex()}>"
mod = report[0]
keys = [hid_name(report[i]) for i in range(2, 8) if report[i]]
return f"mod={fmt_mod(mod)} keys={keys}"
# ---------------------------------------------------------------------------
# Frame decoder
# ---------------------------------------------------------------------------
def decode_frame(cmd: int, payload: bytes) -> str:
if cmd == 0x82:
io = payload[0] & 0x0F if payload else "?"
return f"HEARTBEAT io={io}"
if cmd == 0x81:
# 57 AB 81 [ID1:1] [LEN:2 LE] [PAYLOAD] [ID2:2] [CHK:1]
if len(payload) >= 6:
id1 = payload[0]
plen = payload[1] | (payload[2] << 8)
need = 3 + plen + 3
if len(payload) >= need:
desc = payload[3:3 + plen]
id2 = payload[3 + plen:5 + plen]
chk = payload[5 + plen]
expected = (sum(desc) + sum(id2)) & 0xFF
ok = "OK" if chk == expected else f"ERR(exp {expected:02X})"
return (f"DEVICE_CONNECT id1=0x{id1:02X} len={plen} "
f"id2={id2.hex()} chk={chk:02X}{ok} desc={desc.hex()}")
return f"DEVICE_CONNECT {payload.hex()}"
if cmd in (0x83, 0x88):
tag = "PAIRED" if cmd == 0x83 else "SOLO"
if len(payload) < 4:
return f"KEY_{tag} <short: {payload.hex()}>"
plen = payload[0]
if len(payload) < 1 + plen:
return f"KEY_{tag} <incomplete len={plen} have={len(payload) - 1}: {payload.hex()}>"
ser = payload[1]
typ = payload[2]
# data = bytes between SER and CTR (includes TYPE/MOD/BTN byte where present)
data = payload[3 : 1 + plen - 2]
ctr = payload[1 + plen - 2]
ctr_sum = payload[1 + plen - 1]
expected_sum = (ctr + sum(payload[2 : 1 + plen - 2])) & 0xFF
chk_ok = "OK" if ctr_sum == expected_sum else f"ERR(exp {expected_sum:02X})"
# CH9329 keyboard: LEN=0x0C, TYPE byte=0x01 (HID report ID), then 8-byte boot report
if plen == 0x0C and typ == 0x01:
return (f"KEY_{tag} ser=0x{ser:02X} KB {fmt_hid(data)}"
f" ctr={ctr:02X} sum={ctr_sum:02X}{chk_ok}")
# CH9329 absolute mouse: LEN=0x0A, TYPE byte=0x05 (HID report ID)
if plen == 0x0A and typ == 0x05:
if len(data) >= 6:
btn, xl, xh, yl, yh, whl = data[:6]
x, y = (xh << 8) | xl, (yh << 8) | yl
return (f"KEY_{tag} ser=0x{ser:02X} MOUSE_ABS"
f" btn=0x{btn:02X} x={x} y={y} wheel={whl}"
f" ctr={ctr:02X} sum={ctr_sum:02X}{chk_ok}")
# Boot keyboard (real keyboard, HID boot protocol, no report ID): LEN=0x0B
# No TYPE byte: payload after SER is directly mod rsvd k0..k5.
# 'typ' here is the HID modifier byte; data = [rsvd, k0..k5].
if plen == 0x0B:
hid = bytes([typ]) + bytes(data) # reconstruct 8-byte boot report
return (f"KEY_{tag} ser=0x{ser:02X} KB_BOOT {fmt_hid(hid)}"
f" ctr={ctr:02X} sum={ctr_sum:02X}{chk_ok}")
# Relative mouse with report ID (LEN=0x08, SER bits 5,4 = mouse):
# TYPE=report-id (e.g. 0x01), then 4-byte boot mouse: btn dx dy wheel.
if plen == 0x08 and len(data) >= 4:
btn = data[0]
dx_raw, dy_raw, whl = data[1], data[2], data[3]
dx = dx_raw - 256 if dx_raw > 127 else dx_raw
dy = dy_raw - 256 if dy_raw > 127 else dy_raw
return (f"KEY_{tag} ser=0x{ser:02X} MOUSE_REL_RID"
f" rid=0x{typ:02X} btn=0x{btn:02X} dx={dx:+d} dy={dy:+d} wheel={whl}"
f" ctr={ctr:02X} sum={ctr_sum:02X}{chk_ok}")
# Boot relative mouse (real mouse, HID boot protocol, no report ID): LEN=0x07
# No TYPE byte: payload after SER is buttons dx dy wheel.
# 'typ' here is the HID buttons byte; data = [dx, dy, wheel].
if plen == 0x07 and len(data) >= 3:
btn = typ
dx_raw, dy_raw, whl = data[0], data[1], data[2]
dx = dx_raw - 256 if dx_raw > 127 else dx_raw
dy = dy_raw - 256 if dy_raw > 127 else dy_raw
return (f"KEY_{tag} ser=0x{ser:02X} MOUSE_REL"
f" btn=0x{btn:02X} dx={dx:+d} dy={dy:+d} wheel={whl}"
f" ctr={ctr:02X} sum={ctr_sum:02X}{chk_ok}")
return (f"KEY_{tag} ser=0x{ser:02X} typ=0x{typ:02X} data={data.hex()}"
f" ctr={ctr:02X} sum={ctr_sum:02X}{chk_ok}")
if cmd == 0x01:
return f"KEYBOARD {fmt_hid(payload)}"
if cmd == 0x02:
if len(payload) >= 4:
btn, dx, dy, whl = payload[0], payload[1], payload[2], payload[3]
dx = dx - 256 if dx > 127 else dx
dy = dy - 256 if dy > 127 else dy
return f"MOUSE_REL btn=0x{btn:02X} dx={dx:+d} dy={dy:+d} wheel={whl}"
return f"MOUSE_REL {payload.hex()}"
if cmd == 0x04:
if len(payload) >= 7:
btn, xl, xh, yl, yh, whl = payload[1:7] # payload[0] = report ID
x, y = (xh << 8) | xl, (yh << 8) | yl
return f"MOUSE_ABS btn=0x{btn:02X} x={x} y={y} wheel={whl}"
return f"MOUSE_ABS {payload.hex()}"
if cmd == 0x12:
# UC keep-alive (datasheet §4.10 + observed): 8 bytes payload.
# [P1:2 LE] [P2:2 LE] [LED:1] [STATUS:1] [VERSION:2]
# P1/P2 mirror the PID field of the LC's 0x81 frames once accepted.
# LED bits: 0=NumLk 1=CapsLk 2=ScrLk; 0xFF observed when the target
# host hasn't reported LED state yet (pre-enumeration default).
# STATUS bits (empirical, see CH9350L_PROTO.md):
# bit 0 = port-0 device enumerated on target USB
# bit 1 = port-1 device enumerated on target USB
# bit 2 = UART link healthy / UC alive
# 0x07 = both devices live on target (HID forwarding works).
# 0x04 = UART up but target hasn't enumerated devices yet.
# 0xFF = transient/error during disconnect.
if len(payload) >= 8:
p1 = payload[0:2].hex()
p2 = payload[2:4].hex()
led = payload[4]
status = payload[5]
ver = payload[6:8].hex()
led_flags = [n for b, n in ((0x01, "NumLk"), (0x02, "CapsLk"),
(0x04, "ScrLk")) if led & b]
st_flags = [n for b, n in ((0x01, "p0enum"), (0x02, "p1enum"),
(0x04, "uart")) if status & b]
return (f"UC_KEEPALIVE p1={p1} p2={p2} "
f"led=0x{led:02X}[{','.join(led_flags) or '-'}] "
f"status=0x{status:02X}[{','.join(st_flags) or '-'}] "
f"ver={ver}")
return f"UC_KEEPALIVE {payload.hex()}"
if cmd == 0x87:
return f"GET_VERSION {payload.hex()}"
if cmd == 0x86:
# Attach (followed by 0x80/0x89/0x81) or disconnect (bare). Same opcode.
return "DEVICE_NOTIFY []"
if cmd == 0x89:
return "STATUS_ANNOUNCE []"
if cmd == 0x80:
val = f"0x{payload[0]:02X}" if payload else "?"
return f"STARTUP_STATUS [{val}]"
return f"0x{cmd:02X} [{payload.hex()}]"
# ---------------------------------------------------------------------------
# Sniff mode
# ---------------------------------------------------------------------------
def _ts() -> str:
t = time.time()
us = int((t - int(t)) * 1_000_000)
return f"{time.strftime('%H:%M:%S', time.localtime(t))}.{us:06d}"
def _parse_buf(buf: bytearray) -> tuple[list[tuple[int, bytes]], bytearray]:
"""
Extract complete frames from buf.
Returns (frames, remaining_buf) where frames = [(cmd, payload), ...]
"""
frames = []
while True:
idx = buf.find(HEADER)
if idx == -1:
# No header: keep the last byte in case it starts a header
buf = bytearray(buf[-1:]) if buf else bytearray()
break
if idx > 0:
# Pre-header bytes — emit as a synthetic (-1, skipped) entry so
# the caller can log them; they're usually framing noise from a
# mid-frame sniff start or a corrupted byte.
skipped = bytes(buf[:idx])
buf = buf[idx:]
frames.append((-1, skipped))
if len(buf) < 3:
break # need at least header + cmd
cmd = buf[2]
if cmd in (0x83, 0x88):
# Length-prefixed: payload[0] = LEN, total payload = 1 + LEN bytes
if len(buf) < 4:
break # need at least header + cmd + LEN
plen = buf[3]
needed = 4 + plen # header(2) + cmd(1) + LEN(1) + plen bytes
if len(buf) < needed:
break
payload = bytes(buf[3:needed])
buf = buf[needed:]
frames.append((cmd, payload))
continue
if cmd == 0x81:
# Device Connection: 57 AB 81 [ID1] [LEN_LO LEN_HI] [PAYLOAD] [ID2_LO ID2_HI] [CHK]
if len(buf) < 6:
break # need header + cmd + ID1 + 2-byte LEN
plen = buf[4] | (buf[5] << 8)
needed = 3 + 1 + 2 + plen + 2 + 1 # = 9 + plen
if len(buf) < needed:
break
payload = bytes(buf[3:needed])
buf = buf[needed:]
frames.append((cmd, payload))
continue
known_len = PAYLOAD_LEN.get(cmd)
if known_len is not None:
needed = 3 + known_len
if len(buf) < needed:
break # wait for more data
payload = bytes(buf[3:needed])
buf = buf[needed:]
frames.append((cmd, payload))
else:
# Unknown cmd: scan for next header to bound the frame
next_hdr = buf.find(HEADER, 3)
if next_hdr == -1:
if len(buf) > 128:
payload = bytes(buf[3:])
frames.append((cmd, payload))
buf = bytearray()
else:
break
else:
payload = bytes(buf[3:next_hdr])
frames.append((cmd, payload))
buf = buf[next_hdr:]
return frames, buf
def sniff(port_name: str, baud: int = 115200, logpath: str | None = None,
tag: str | None = None):
log = open(logpath, "w", encoding="utf-8") if logpath else None
prefix = f"[{tag}] " if tag else ""
def emit(msg: str):
print(msg)
if log:
log.write(msg + "\n")
log.flush()
emit(f"[sniff{f' {tag}' if tag else ''}] Opening {port_name} @ {baud} 8N1"
+ (f" log={logpath}" if log else "") + "\n")
try:
with serial.Serial(port_name, baud, timeout=0.05) as ser:
buf = bytearray()
try:
while True:
chunk = ser.read(256)
if chunk:
buf.extend(chunk)
frames, buf = _parse_buf(buf)
for cmd, payload in frames:
if cmd == -1:
emit(f"{_ts()} {prefix}[sync: skipped {len(payload)}B: {payload.hex()}]")
else:
raw = HEADER + bytes([cmd]) + payload
emit(f"{_ts()} {prefix}{raw.hex(' ')} | {decode_frame(cmd, payload)}")
except KeyboardInterrupt:
emit("\n[sniff] Stopped.")
finally:
if log:
log.close()
# ---------------------------------------------------------------------------
# Emulate mode (lower-computer state machine)
# ---------------------------------------------------------------------------
class LowerComputer:
"""
CH9350L lower-computer state machine.
Drives the attach sequence (0x86 → 0x80 ×2 → heartbeats → 0x89 → 0x81
descriptors), maintains the per-SER frame counters, and gates the
state-0 → state-1 transition on UC acknowledgement of every announced
PID. CMD = 0x88 in state 0, 0x83 in state 1 — payload is identical.
State-2 mode (--state2) is an alternative, handshake-free dipswitch
configuration on the UC side (S0=LOW, S1=HIGH); it sends fixed-length
`57 AB 01 ...` keyboard / `57 AB 02 ...` mouse frames with no SER, no
counter, and no descriptor announce. Documented from the datasheet
only — not verified empirically.
See docs/CH9350L_PROTO.md for full frame formats.
"""
STATE0 = 0
STATE1 = 1
# State-0/1 frame SER + report-ID bytes. Each SER must be consistent
# with the HID Report Descriptor announced via 0x81 — RID values must
# match Report ID items in the descriptor.
#
# Default (matches DEFAULT_*_DESC): keyboard report-ID=0x01 boot kbd,
# mouse report-ID=0x01 relative.
KB_SER = 0x13 # keyboard / HID / port 2
KB_RID = 0x01 # 8-byte boot keyboard report follows
MOU_SER = 0x22 # mouse / HID / port 1
MOU_RID = 0x01 # 4-byte relative mouse report follows
# Boot-protocol format (no report ID — used when the device's descriptor
# contains no Report ID item; --boot-hid selects this path).
KB_SER_BOOT = 0x11
MOU_SER_BOOT = 0x20
def __init__(self, port: serial.Serial, state2_mode: bool = False,
boot_hid: bool = False,
mouse_desc: bytes | None = None, mouse_pid: bytes | None = None,
kbd_desc: bytes | None = None, kbd_pid: bytes | None = None):
self.port = port
self.state2_mode = state2_mode
self.boot_hid = boot_hid
self.mouse_desc = mouse_desc
self.mouse_pid = mouse_pid or b"\x00\x00"
self.kbd_desc = kbd_desc
self.kbd_pid = kbd_pid or b"\x00\x00"
self.state = self.STATE0
self._stop = threading.Event()
self._last_hb = 0.0
self._hb_interval = 1.0
# Frame counters are per-SER (datasheet §4.3 / observed): keyboard
# and mouse increment independently.
self._kbd_counter = 0
self._mou_counter = 0
# Sync gates — set by _rx_loop when UC keep-alive is observed and
# when each announced PID is reflected back. Used to defer 0x81
# send until UC is up, and to drive periodic 0x81 retransmit while
# un-acked.
self._uc_seen = threading.Event()
self._uc_p1 = b"\x00\x00"
self._uc_p2 = b"\x00\x00"
self._uc_status = 0x00 # STATUS byte from latest 0x12
# Set when rx_loop detects PID-drop (UC re-attached to target);
# the maintenance thread picks this up and replays the full
# attach sequence (0x86 → 0x80 → 0x89 → 0x81). PID-only
# retransmit is insufficient: the UC needs 0x86/0x80 to re-
# present its USB-device side to the target host.
self._reattach_needed = threading.Event()
self._last_mouse_announce = 0.0
self._last_kbd_announce = 0.0
self._announce_retry_interval = 2.0
# Serialise port writes — REPL, rx_loop responses, and the
# maintenance thread all call _send concurrently.
self._tx_lock = threading.Lock()
def _build_device_connect_frame(self, descriptor: bytes,
port_id: int, device_pid: bytes) -> bytes:
"""
Build a 0x81 Device Connection Frame:
57 AB 81 [PORT:1] [LEN:2 LE] [DESCRIPTOR] [PID:2] [CHK:1]
CHK = (sum(DESCRIPTOR) + sum(PID)) & 0xFF
DESCRIPTOR is the USB HID Report Descriptor of the device the LC has
enumerated. PID is a 2-byte identifier reflected back in the UC's
0x12 keep-alive once the descriptor has been processed.
"""
if len(device_pid) != 2:
raise ValueError("device_pid must be 2 bytes")
plen = len(descriptor)
chk = (sum(descriptor) + sum(device_pid)) & 0xFF
return (HEADER + bytes([0x81, port_id, plen & 0xFF, (plen >> 8) & 0xFF])
+ descriptor + device_pid + bytes([chk]))
def _send(self, data: bytes):
with self._tx_lock:
self.port.write(data)
print(f"{_ts()} TX {data.hex(' ')}")
def _heartbeat(self):
self._send(HEARTBEAT_FRAME)
self._last_hb = time.time()
def _send_announce(self):
"""Send a 0x89 status announce. Real LC emits these ~3 times during
attach (see CH9350L_PROTO.md §Status Announce); the emulator sends
one as part of the attach sequence."""
self._send(HEADER + bytes([0x89]))
def _announce_descriptors(self):
"""Send 0x81 Device Connection Frames for each configured device.
Used both for the initial announce and for retransmits."""
now = time.time()
if self.mouse_desc:
self._send(self._build_device_connect_frame(
self.mouse_desc, port_id=0x00, device_pid=self.mouse_pid))
self._last_mouse_announce = now
time.sleep(0.05)
if self.kbd_desc:
self._send(self._build_device_connect_frame(
self.kbd_desc, port_id=0x01, device_pid=self.kbd_pid))
self._last_kbd_announce = now
time.sleep(0.05)
def _maybe_retransmit_descriptors(self, now: float):
"""Retransmit any 0x81 whose PID isn't yet reflected in the UC's
keep-alive. Matches real-LC behaviour (~2 s retry) and recovers
from UC restart or transient UART loss mid-session."""
if self.mouse_desc and self._uc_p1 != self.mouse_pid:
if now - self._last_mouse_announce >= self._announce_retry_interval:
self._send(self._build_device_connect_frame(
self.mouse_desc, port_id=0x00, device_pid=self.mouse_pid))
self._last_mouse_announce = now
if self.kbd_desc and self._uc_p2 != self.kbd_pid:
if now - self._last_kbd_announce >= self._announce_retry_interval:
self._send(self._build_device_connect_frame(
self.kbd_desc, port_id=0x01, device_pid=self.kbd_pid))
self._last_kbd_announce = now
def _build_key_frame_state01(self, modifier: int, keycodes: list[int]) -> bytes:
"""
State-0/1 keyboard frame.
Default (LEN=0x0C, SER=0x13): RID=0x01 prefix + 8-byte HID boot report.
Boot format (--boot-hid, LEN=0x0B, SER=0x11): no RID prefix.
"""
cmd = 0x83 if self.state == self.STATE1 else 0x88
keys = (keycodes + [0] * 6)[:6]
if self.boot_hid:
hid = bytes([modifier, 0x00] + keys)
ser = self.KB_SER_BOOT
else:
hid = bytes([self.KB_RID, modifier, 0x00] + keys)
ser = self.KB_SER
ctr = self._kbd_counter & 0xFF
self._kbd_counter += 1
ctr_sum = (ctr + sum(hid)) & 0xFF
plen = 1 + len(hid) + 2 # SER + hid + CTR + CTR_SUM
payload = bytes([plen, ser]) + hid + bytes([ctr, ctr_sum])
return HEADER + bytes([cmd]) + payload
def _build_key_frame_state2(self, modifier: int, keycodes: list[int]) -> bytes:
"""State-2 keyboard frame: 57 AB 01 [8-byte HID boot report]"""
keys = (keycodes + [0] * 6)[:6]
report = bytes([modifier, 0x00] + keys)
return HEADER + bytes([0x01]) + report
def _build_mouse_frame_state01(self, btn: int, dx: int, dy: int, wheel: int) -> bytes:
"""
State-0/1 relative mouse frame.
Default (LEN=0x08, SER=0x22): RID=0x01 prefix + 4-byte boot mouse
[btn dx dy wheel]. Matches DEFAULT_MOUSE_DESC.
Boot format (--boot-hid, LEN=0x07, SER=0x20): no RID prefix.
dx/dy are signed deltas clamped to ±127.
"""
cmd = 0x83 if self.state == self.STATE1 else 0x88
dx = max(-127, min(127, dx)) & 0xFF
dy = max(-127, min(127, dy)) & 0xFF
if self.boot_hid:
data = bytes([btn, dx, dy, wheel & 0xFF])
ser = self.MOU_SER_BOOT
else:
data = bytes([self.MOU_RID, btn, dx, dy, wheel & 0xFF])
ser = self.MOU_SER
ctr = self._mou_counter & 0xFF
self._mou_counter += 1
ctr_sum = (ctr + sum(data)) & 0xFF
plen = 1 + len(data) + 2 # SER + data + CTR + CTR_SUM
payload = bytes([plen, ser]) + data + bytes([ctr, ctr_sum])
return HEADER + bytes([cmd]) + payload
def _build_mouse_frame_state2(self, btn: int, dx: int, dy: int, wheel: int) -> bytes:
"""State-2 relative mouse frame: 57 AB 02 btn dx dy wheel"""
dx = max(-127, min(127, dx)) & 0xFF
dy = max(-127, min(127, dy)) & 0xFF
return HEADER + bytes([0x02, btn & 0xFF, dx, dy, wheel & 0xFF])
def send_key(self, modifier: int, keycodes: list[int]):
if self.state2_mode:
frame = self._build_key_frame_state2(modifier, keycodes)
else:
frame = self._build_key_frame_state01(modifier, keycodes)
self._send(frame)
def send_key_release(self):
self.send_key(0, [])
def send_mouse(self, btn: int, dx: int, dy: int, wheel: int = 0):
"""Send a relative mouse frame. dx/dy are signed 8-bit deltas."""
if self.state2_mode:
frame = self._build_mouse_frame_state2(btn, dx, dy, wheel)
else:
frame = self._build_mouse_frame_state01(btn, dx, dy, wheel)
self._send(frame)
def _run_attach_sequence(self, *, wait_for_uc: bool):
"""Replay the LC→UC attach sequence:
0x86 → 0x80 0xFF (×2) → heartbeat → 0x89 → 0x81 ×N
Used both at startup (wait_for_uc=True: defer 0x81 until we
observe a UC keep-alive) and on reconnect (wait_for_uc=False:
UC is already keep-aliving, but its USB device side dropped
and needs 0x86/0x80 to re-present to the target host).
"""
self._send(HEADER + bytes([0x86]))
time.sleep(0.25)
self._send(HEADER + bytes([0x80, 0xFF]))
time.sleep(0.23)
self._send(HEADER + bytes([0x80, 0xFF]))
self._heartbeat()
time.sleep(1.0)
self._send_announce()
if not (self.mouse_desc or self.kbd_desc):
return
if wait_for_uc and not self._uc_seen.is_set():
print("[emulate] Waiting for UC keep-alive before announcing devices...")
while not self._uc_seen.wait(timeout=self._hb_interval):
self._heartbeat()
self._announce_descriptors()
def _tx_maint_loop(self):
"""Background thread: heartbeat, 0x81 retransmit, and full
attach replay on reconnect. Decoupled from the REPL so a blocked
input() prompt doesn't starve the UC of heartbeats or stall
reconnect handling."""
while not self._stop.is_set():
if self._reattach_needed.is_set():
self._reattach_needed.clear()
print(f"{_ts()} ** Reattach: replaying attach sequence **")
self._run_attach_sequence(wait_for_uc=False)
continue
now = time.time()
if now - self._last_hb >= self._hb_interval:
self._heartbeat()
self._maybe_retransmit_descriptors(now)
self._stop.wait(timeout=0.2)
def _rx_loop(self):
"""Background thread: read + decode incoming frames from upper computer."""
buf = bytearray()
while not self._stop.is_set():
try:
chunk = self.port.read(64)
except serial.SerialException:
break
if chunk:
buf.extend(chunk)
frames, buf = _parse_buf(buf)
for cmd, payload in frames:
if cmd == -1:
print(f"{_ts()} RX [sync skip {len(payload)}B]")
continue
print(f"{_ts()} RX {(HEADER + bytes([cmd]) + payload).hex(' ')} | {decode_frame(cmd, payload)}")
if cmd == 0x12 and len(payload) >= 4:
# UC keep-alive observed → cleared to send 0x81
# frames. Track current PIDs so the main loop can
# drive retransmits and the state-1 gate.
self._uc_p1 = bytes(payload[0:2])
self._uc_p2 = bytes(payload[2:4])
if len(payload) >= 6:
self._uc_status = payload[5]
if not self._uc_seen.is_set():
print(f"{_ts()} ** UC keep-alive observed — "
f"announce phase cleared **")
self._uc_seen.set()
want_p1 = self.mouse_pid if self.mouse_desc else b"\x00\x00"
want_p2 = self.kbd_pid if self.kbd_desc else b"\x00\x00"
any_announced = self.mouse_desc or self.kbd_desc
all_acked = (self._uc_p1 == want_p1 and self._uc_p2 == want_p2)
if (self.state == self.STATE0 and any_announced
and all_acked):
print(f"{_ts()} ** UC acknowledged all PIDs — "
f"entering state 1 **")
self.state = self.STATE1
elif self.state == self.STATE1 and not all_acked:
# UC dropped a PID — likely a USB-side
# disconnect/reconnect on the target. Revert
# to state 0 and trigger the maintenance
# thread to replay the full attach sequence
# (0x81 alone won't make the UC re-enumerate
# to the target — STATUS stays at 0x04).
print(f"{_ts()} ** UC PIDs no longer match "
f"(p1={self._uc_p1.hex()} p2={self._uc_p2.hex()}) "
f"— reverting to state 0, triggering reattach **")
self.state = self.STATE0
self._reattach_needed.set()
def run_interactive(self):
"""
Run the attach sequence and start an interactive REPL on stdin.
Commands:
<hex>[,<hex>...] press one or more keys by HID usage ID
SHIFT+<hex> with left-shift modifier (also CTRL+, ALT+)
m DX DY [B] [W] relative mouse delta, optional button + wheel
r release all keys
q quit
"""
rx = threading.Thread(target=self._rx_loop, daemon=True)
rx.start()
# Initial attach: 0x86 → 0x80 ×2 → 0x89 → wait for UC keep-alive
# → 0x81 descriptors. The 0x81 frames carry HID Report Descriptors
# for each connected device; without them the UC enumerates a
# default device whose report IDs don't match the 0x83/0x88 key
# frames and input is silently dropped on the target.
self._run_attach_sequence(wait_for_uc=True)
if not (self.mouse_desc or self.kbd_desc):
print("[emulate] No 0x81 descriptors configured — UC will not "
"enumerate matching HID devices. Use --mouse-desc / "
"--kbd-desc to supply captured payloads, or rely on the "
"built-in defaults.")
self._heartbeat()
# Hand heartbeat + retransmit off to a daemon thread so a blocked
# input() prompt doesn't starve the UC of 0x82 (which causes it to
# stop sending 0x12 keep-alives), and so reconnect retransmits
# don't have to wait for the user to press Enter.
threading.Thread(target=self._tx_maint_loop, daemon=True).start()
print("\n[emulate] Ready. Type 'q' Enter to quit, see docstring for commands.\n")
try:
while True:
line = input("> ").strip()
if not line:
continue
if line.lower() == "q":
break
if line.lower() == "r":
self.send_key_release()
continue
# Mouse command: m DX DY [btn=0] [wheel=0]
if line.lower().startswith("m "):
parts = line.split()
try:
dx = int(parts[1])
dy = int(parts[2])
btn = int(parts[3]) if len(parts) > 3 else 0
wheel = int(parts[4]) if len(parts) > 4 else 0
self.send_mouse(btn, dx, dy, wheel)
except (IndexError, ValueError):
print(" Usage: m DX DY [btn=0] [wheel=0]")
continue
# Keyboard command
lower = line.lower()
modifier = 0
if lower.startswith("shift+"):
modifier = 0x02
line = line[6:]
elif lower.startswith("ctrl+"):
modifier = 0x01
line = line[5:]
elif lower.startswith("alt+"):
modifier = 0x04
line = line[4:]
try:
keycodes = [int(c.strip(), 16) for c in line.split(",")]
self.send_key(modifier, keycodes)
time.sleep(0.05)
self.send_key_release()
except ValueError:
print(f" Bad input — key: hex usage IDs e.g. 04,05 | mouse: m X Y [btn] [wheel]")
except (KeyboardInterrupt, EOFError):
pass
finally:
self._stop.set()
print("\n[emulate] Stopped.")
def emulate(port_name: str, baud: int = 115200, state2: bool = False,
boot_hid: bool = False,
mouse_desc: bytes | None = None, mouse_pid: bytes | None = None,
kbd_desc: bytes | None = None, kbd_pid: bytes | None = None):
parts = ["state-2 (no pairing)" if state2 else "state-0/1 (heartbeat + pairing)"]
if boot_hid:
parts.append("boot-HID (SER=0x11/0x20, no TYPE byte)")
if mouse_desc:
parts.append(f"mouse-desc={len(mouse_desc)}B pid={mouse_pid.hex() if mouse_pid else '0000'}")
if kbd_desc:
parts.append(f"kbd-desc={len(kbd_desc)}B pid={kbd_pid.hex() if kbd_pid else '0000'}")
print(f"[emulate] Opening {port_name} @ {baud} 8N1 mode={', '.join(parts)}")
with serial.Serial(port_name, baud, timeout=0.05) as ser:
lc = LowerComputer(ser, state2_mode=state2, boot_hid=boot_hid,
mouse_desc=mouse_desc, mouse_pid=mouse_pid,
kbd_desc=kbd_desc, kbd_pid=kbd_pid)
lc.run_interactive()
def _hex_arg(s: str | None) -> bytes | None:
return bytes.fromhex(s.replace(" ", "")) if s else None
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(description="CH9350L protocol PoC (issue #13)")
sub = ap.add_subparsers(dest="cmd", required=True)
sp = sub.add_parser("sniff", help="Decode incoming CH9350L frames")
sp.add_argument("port", help="Serial port (e.g. COM4)")
sp.add_argument("--baud", type=int, default=115200)
sp.add_argument("--log", metavar="FILE", help="Also write output to FILE (UTF-8)")
sp.add_argument("--tag", metavar="LABEL",
help="Prefix every line with [LABEL] (e.g. LC or UC) for two-port captures")
ep = sub.add_parser("emulate", help="Lower-computer state machine")
ep.add_argument("port", help="Serial port connected to upper computer")
ep.add_argument("--baud", type=int, default=115200)
ep.add_argument("--state2", action="store_true",
help="Use state-2 mode (simpler fixed frames, requires upper computer S0=LOW)")
ep.add_argument("--boot-hid", action="store_true",
help="Use boot-protocol HID frame format (SER=0x11/0x20, no TYPE byte)")
ep.add_argument("--mouse-desc", metavar="HEX",
help="HID Report Descriptor for mouse 0x81 frame (default: captured)")
ep.add_argument("--mouse-pid", metavar="HEX", default="4000",
help="2-byte port-1 PID for mouse 0x81 (default: 4000)")
ep.add_argument("--kbd-desc", metavar="HEX",
help="HID Report Descriptor for keyboard 0x81 frame (default: captured)")
ep.add_argument("--kbd-pid", metavar="HEX", default="0315",
help="2-byte port-2 PID for keyboard 0x81 (default: 0315)")
ep.add_argument("--no-default-desc", action="store_true",
help="Do not send any 0x81 frames unless explicitly supplied")
args = ap.parse_args()
if args.cmd == "sniff":
sniff(args.port, args.baud, args.log, args.tag)
return
# State-2 mode bypasses the descriptor-announce protocol entirely;
# combining it with --mouse-desc / --kbd-desc / --no-default-desc is
# incoherent.
if args.state2 and (args.mouse_desc or args.kbd_desc or args.no_default_desc):
ap.error("--state2 cannot be combined with descriptor flags "
"(--mouse-desc, --kbd-desc, --no-default-desc)")
if args.state2 or args.no_default_desc:
mouse_desc = _hex_arg(args.mouse_desc)
kbd_desc = _hex_arg(args.kbd_desc)
else:
mouse_desc = _hex_arg(args.mouse_desc) or DEFAULT_MOUSE_DESC
kbd_desc = _hex_arg(args.kbd_desc) or DEFAULT_KBD_DESC
emulate(args.port, args.baud, args.state2, args.boot_hid,
mouse_desc=mouse_desc, mouse_pid=_hex_arg(args.mouse_pid),
kbd_desc=kbd_desc, kbd_pid=_hex_arg(args.kbd_pid))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment