Last active
March 15, 2024 08:40
-
-
Save dogtopus/aba02ea5abaae6d0e66552b50ba21ca9 to your computer and use it in GitHub Desktop.
Convert sigrok JSON trace containing UART HCI traffic to HCI PCAP file. Requires scapy.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from __future__ import annotations | |
''' | |
Convert sigrok JSON trace containing UART HCI traffic to HCI PCAP file. | |
The JSON trace files are typically acquired with: | |
sigrok-cli -i some-sigrok-session.sr -P uart:baudrate=<hci-baud> -A uart=tx-data:rx-data --protocol-decoder-jsontrace > some-json-trace-file.json | |
Calling with piping is also possibe: | |
sigrok-cli -i some-sigrok-session.sr -P uart:baudrate=<hci-baud> -A uart=tx-data:rx-data --protocol-decoder-jsontrace | python3 trace2pcap.py - some-pcap.pcap | |
If pins are not named as "RX" and "TX" in the sigrok session file ("RXD" and "TXD" might also work. Not sure), extra parameters may needed for -P in order for the decoder to recognize the signals. | |
Note that the trace file should only contain TX and RX channels or unexpected things may happen. Therefore the -A parameter is necessary. | |
This can probably be easily adapted to also generate USBLL PCAP files from sigrok sessions but such possibility is not explored (and Wireshark support for USBLL is still slightly inferior compare to even sigrok which is already somewhat baremetal). | |
''' | |
from typing import ( | |
TypedDict, | |
NamedTuple, | |
Tuple, | |
List, | |
Dict, | |
Iterator, | |
TextIO, | |
Optional, | |
Sequence, | |
) | |
import argparse | |
import json | |
import sys | |
import time | |
from scapy.all import wrpcap, Packet | |
from scapy.layers.bluetooth import HCI_PHDR_Hdr, HCI_Hdr | |
class TraceEvent(TypedDict): | |
ph: str | |
ts: float | |
pid: str | |
tid: str | |
name: str | |
class ByteEvent(NamedTuple): | |
byte: int | |
span_us: Tuple[float, float] | |
decoder: str | |
channel: str | |
class ChannelState(TypedDict): | |
last_byte_ends_at: float | |
pkt_starts: float | |
pkt_content: bytearray | |
EventKey = Tuple[str, str, str] | |
def parse_args(): | |
p = argparse.ArgumentParser(description='Convert sigrok JSON trace containing UART HCI traffic to HCI PCAP file.') | |
p.add_argument('jsontrace', help='Chromium JSON trace file generated with sigrok-cli using --protocol-decoder-jsontrace. Use - to read from stdin.') | |
p.add_argument('pcap', help='Output PCAP file.') | |
p.add_argument('-0', '--epoch', type=float, default=time.time(), help='Set epoch for the PCAP dump (default is now).') | |
p.add_argument('-b', '--break-threshold', type=float, default=25.0, help='Set break threshold in microseconds (max amount of time between 2 bytes for the program to consider them in different packets) (default is 25.0).') | |
p.add_argument('-d', '--decoder-name', dest='decoder_names', action='append', help='Name of the sigrok protocol decoder responsible for UART decoding. Can be specified multiple times. Defaults to including all.') | |
p.add_argument('-r', '--reverse-rxtx', '--assume-controller', dest='reverse_rxtx', action='store_true', default=False, help='Assume the captured data were seen at the controller\'s perspective (i.e. TX=controller to host and RX=host to controller).') | |
return p, p.parse_args() | |
def load_jsontrace(jsontrace_file: TextIO) -> Iterator[ByteEvent]: | |
jsontrace_obj = json.load(jsontrace_file) | |
jsontrace: List[TraceEvent] = jsontrace_obj['traceEvents'] | |
openings: Dict[EventKey, float] = {} | |
for ev in jsontrace: | |
ev_key: EventKey = ev['pid'], ev['tid'], ev['name'] | |
if ev['ph'] == 'B': | |
if ev_key in openings: | |
raise ValueError(f'Overlapping event on the same channel detected: {ev}.') | |
openings[ev_key] = ev['ts'] | |
elif ev['ph'] == 'E': | |
if ev_key not in openings: | |
raise ValueError(f'Unmatched event closing: {ev}.') | |
yield ByteEvent(int(ev['name'], base=16), (openings[ev_key], ev['ts']), ev['pid'], ev['tid']) | |
del openings[ev_key] | |
def generate_scapy_packets(events: Iterator[ByteEvent], decoder_names: Optional[Sequence[str]], break_threshold_us: float, epoch: float, reverse_rxtx: bool) -> Iterator[Packet]: | |
rxtx_mapping: Dict[str, int] = ( | |
{'RX': 1, 'TX': 0}, | |
{'RX': 0, 'TX': 1}, | |
)[reverse_rxtx] | |
channel_states: Dict[str, ChannelState] = { | |
'RX': {'last_byte_ends_at': -1, 'pkt_starts': -1, 'pkt_content': bytearray()}, | |
'TX': {'last_byte_ends_at': -1, 'pkt_starts': -1, 'pkt_content': bytearray()}, | |
} | |
for byte in events: | |
if decoder_names is not None and len(decoder_names) != 0 and byte.decoder not in decoder_names: | |
continue | |
channel_state = channel_states[byte.channel] | |
# init packet start time | |
if channel_state['pkt_starts'] < 0: | |
channel_state['pkt_starts'] = byte.span_us[0] | |
# if we have recorded the ending time for last byte and the difference | |
# with the current byte's opening time is over the break threshold, | |
# split and yield the packet | |
if channel_state['last_byte_ends_at'] >= 0 and byte.span_us[0] - channel_state['last_byte_ends_at'] >= break_threshold_us: | |
scapy_packet = HCI_PHDR_Hdr(direction=rxtx_mapping[byte.channel]) / HCI_Hdr(channel_state['pkt_content']) | |
scapy_packet.time = epoch + channel_state['pkt_starts'] / 1000 / 1000 | |
yield scapy_packet | |
channel_state['pkt_starts'] = byte.span_us[0] | |
channel_state['pkt_content'].clear() | |
# update ending time and buffer the byte | |
channel_state['pkt_content'].append(byte.byte) | |
channel_state['last_byte_ends_at'] = byte.span_us[1] | |
# handle last packets for each channel (earlier one first) | |
for channel, state in sorted(channel_states.items(), key=lambda state: state[1]['pkt_starts']): | |
if len(state['pkt_content']) != 0: | |
scapy_packet = HCI_PHDR_Hdr(direction=rxtx_mapping[channel]) / HCI_Hdr(state['pkt_content']) | |
scapy_packet.time = epoch + state['pkt_starts'] / 1000 / 1000 | |
yield scapy_packet | |
if __name__ == '__main__': | |
_, args = parse_args() | |
if args.jsontrace != '-': | |
f = open(args.jsontrace, 'r') | |
else: | |
f = sys.stdin | |
with f: | |
wrpcap(args.pcap, generate_scapy_packets(load_jsontrace(f), args.decoder_names, args.break_threshold, args.epoch, args.reverse_rxtx)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment