-
-
Save minkione/909251842055a059f3a62060f57c56a4 to your computer and use it in GitHub Desktop.
A python script to coalesce output from `socat -x` hexdumps.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import argparse | |
import datetime | |
import datetime | |
import re | |
class TimeDeltaType(object): | |
""" | |
Interprets a string as a timedelta for argument parsing. | |
With no default unit: | |
>>> tdtype = TimeDeltaType() | |
>>> tdtype('5s') | |
datetime.timedelta(0, 5) | |
>>> tdtype('5.5s') | |
datetime.timedelta(0, 5, 500000) | |
>>> tdtype('5:06:07:08s') | |
datetime.timedelta(5, 22028) | |
>>> tdtype('5d06h07m08s') | |
datetime.timedelta(5, 22028) | |
>>> tdtype('5d') | |
datetime.timedelta(5) | |
With a default unit of minutes: | |
>>> tdmins = TimeDeltaType('m') | |
>>> tdmins('5s') | |
datetime.timedelta(0, 5) | |
>>> tdmins('5') | |
datetime.timedelta(0, 300) | |
>>> tdmins('6:05') | |
datetime.timedelta(0, 21900) | |
And some error cases: | |
>>> tdtype('5') | |
Traceback (most recent call last): | |
... | |
ValueError: Cannot infer units for '5' | |
>>> tdtype('5:5d') | |
Traceback (most recent call last): | |
... | |
ValueError: Colon not handled for unit 'd' | |
>>> tdtype('5:5ms') | |
Traceback (most recent call last): | |
... | |
ValueError: Colon not handled for unit 'ms' | |
>>> tdtype('5q') | |
Traceback (most recent call last): | |
... | |
ValueError: Unknown unit: 'q' | |
""" | |
units = { | |
'd': datetime.timedelta(days=1), | |
'h': datetime.timedelta(seconds=60 * 60), | |
'm': datetime.timedelta(seconds=60), | |
's': datetime.timedelta(seconds=1), | |
'ms': datetime.timedelta(microseconds=1000), | |
} | |
colon_mult_ind = ['h', 'm', 's'] | |
colon_mults = [24, 60, 60] | |
unit_re = re.compile(r'[^\d:.,-]+', re.UNICODE) | |
def __init__(self, default_unit=None): | |
self.default_unit = default_unit | |
def __call__(self, val): | |
res = datetime.timedelta() | |
for num, unit in self._parse(val): | |
unit = unit.lower() | |
if ':' in num: | |
try: | |
colon_mults = self.colon_mults[:self.colon_mult_ind.index(unit) + 1] | |
except ValueError: | |
raise ValueError('Colon not handled for unit %r' % unit) | |
else: | |
colon_mults = [] | |
try: | |
unit = self.units[unit] | |
except KeyError: | |
raise ValueError('Unknown unit: %r' % unit) | |
mult = 1 | |
for part in reversed(num.split(':')): | |
res += self._mult_td(unit, (float(part) if '.' in part else int(part)) * mult) | |
if colon_mults: | |
mult *= colon_mults.pop() | |
return res | |
def _parse(self, val): | |
pairs = [] | |
start = 0 | |
for match in self.unit_re.finditer(val): | |
num = val[start:match.start()] | |
unit = match.group() | |
pairs.append((num, unit)) | |
start = match.end() | |
num = val[start:] | |
if num: | |
if pairs or self.default_unit is None: | |
raise ValueError('Cannot infer units for %r' % num) | |
else: | |
pairs.append((num, self.default_unit)) | |
return pairs | |
@staticmethod | |
def _mult_td(td, mult): | |
# Necessary because timedelta * float is not supported: | |
return datetime.timedelta(days=td.days * mult, seconds=td.seconds * mult, microseconds=td.microseconds * mult) | |
def mkdelta(str): | |
return TimeDeltaType()(str) | |
class hexdump: | |
def __init__(self, buf, asc=False): | |
self.buf = buf | |
self.asc = asc | |
def asciify(self, bs): | |
if not self.asc: | |
return "" | |
else: | |
return " {:16}".format( | |
"".join((chr(x) if 32 <= x < 127 else "." for x in bs))) | |
def __iter__(self): | |
last_bs, last_line = None, None | |
for i in range(0, len(self.buf), 16): | |
bs = bytearray(self.buf[i : i + 16]) | |
line = " {:23} {:23}{}".format( | |
" ".join(("{:02x}".format(x) for x in bs[:8])), | |
" ".join(("{:02x}".format(x) for x in bs[8:])), | |
self.asciify(bs), | |
) | |
if bs == last_bs: | |
line = "*" | |
if bs != last_bs or line != last_line: | |
yield line | |
last_bs, last_line = bs, line | |
def __str__(self): | |
return "\n".join(self) | |
def __repr__(self): | |
return "\n".join(self) | |
class Record: | |
direction = { ">":">", "<":"<" } | |
quiet = False | |
asc = True | |
def __init__(self, line): | |
parts = line.split() | |
self.direction=parts[0] | |
self.time = datetime.datetime.strptime(" ".join(parts[1:3]), '%Y/%m/%d %H:%M:%S.%f') | |
self.length = int(parts[3].split("=")[1]) | |
self.start = int(parts[4].split("=")[1]) | |
self.to = int(parts[5].split("=")[1]) | |
self.data = bytearray() | |
def append_data(self, str): | |
str = str[1:] | |
if len(str) > 48: | |
str = str[:48] | |
self.data += bytes.fromhex(str) | |
def append_bytes(self, data): | |
self.data += data | |
self.length += len(data) | |
self.to += len(data) | |
def validate(self): | |
assert len(self.data) == self.length, f"{len(self.data)} != {self.length}" | |
assert self.length == self.to - self.start + 1, f"{self.length} != {self.to} - {self.start} + 1" | |
def __repr__(self): | |
ret = Record.direction[self.direction] | |
if not Record.quiet: | |
ret += f" {self.time.strftime('%Y/%m/%d %H:%M:%S.%f')} length={self.length} from={self.start} to={self.to}" | |
ret += "\n" + str(hexdump(self.data, Record.asc)) + "\n" + "--" | |
return ret | |
def init_argparse() -> argparse.ArgumentParser: | |
parser = argparse.ArgumentParser( | |
usage="%(prog)s [OPTION] [FILE]...", | |
description="Pipeline to post-process output of 'socat -x' in various ways.", | |
) | |
parser.add_argument( | |
"-v", "--version", action="version", | |
version=f"{parser.prog} version 1.0.0" | |
) | |
parser.add_argument( | |
"-c", "--coalesce", help="Coalesce data travelling in the same direction, within the specified duration", | |
type=mkdelta, default=mkdelta("0s")) | |
parser.add_argument( | |
"-l", "--left", help="Provide a description for data travelling from left to right '>'") | |
parser.add_argument( | |
"-r", "--right", help="Provide a description for data travelling from right to left '<'") | |
parser.add_argument( | |
"-i", "--incremental", help="Change timing information to incremental i.e. time difference from previous packet") | |
parser.add_argument( | |
"-q", "--quiet", action="store_true", help="Reduce the verbosity of the output to help align diffs") | |
parser.add_argument( | |
"-x", "--switch", action="store_true", help="Switch indicated direction of communications") | |
parser.add_argument( | |
"--dump-left", help="Saves the bytes of the left direction to the designated file. Affected by '-x'") | |
parser.add_argument( | |
"--dump-right", help="Saves the bytes of the right direction to the designated file. Affected by '-x'") | |
return parser | |
def main() -> None: | |
parser = init_argparse() | |
args = parser.parse_args() | |
dump_left = None | |
dump_right = None | |
if args.left: | |
Record.direction[">"] = args.left | |
if args.right: | |
Record.direction["<"] = args.right | |
if args.quiet: | |
Record.quiet = args.quiet | |
if args.dump_left: | |
dump_left = open(args.dump_left, "wb") | |
if args.dump_right: | |
dump_right = open(args.dump_right, "wb") | |
if args.switch: | |
tmp = Record.direction[">"] | |
Record.direction[">"] = Record.direction["<"] | |
Record.direction["<"] = tmp | |
tmp = dump_left | |
dump_left = dump_right | |
dump_right = tmp | |
record = None | |
prev_record = None | |
last_record_time = None | |
while (line := sys.stdin.readline()) is not None: | |
if line.strip() == "": | |
break | |
if not record: | |
if line[0] in ('<','>'): | |
record = Record(line) | |
if prev_record != None: | |
if record.direction != prev_record.direction: | |
print(prev_record) | |
if prev_record.direction == ">" and dump_left: | |
dump_left.write(prev_record.data) | |
elif prev_record.direction == "<" and dump_right: | |
dump_right.write(prev_record.data) | |
prev_record = None | |
elif record.time - prev_record.time > args.coalesce: | |
print(prev_record) | |
if prev_record.direction == ">" and dump_left: | |
dump_left.write(prev_record.data) | |
elif prev_record.direction == "<" and dump_right: | |
dump_right.write(prev_record.data) | |
prev_record = None | |
else: | |
print(f"Discarding unexpected data: {line}", file=sys.stderr) | |
elif line[0:2] == "--": | |
record.validate() | |
if prev_record is not None: | |
prev_record.append_bytes(record.data) | |
prev_record.validate() | |
else: | |
prev_record = record | |
record = None | |
elif line[0] == " ": | |
record.append_data(line) | |
if prev_record is not None: | |
print(prev_record) | |
if prev_record.direction == ">" and dump_left: | |
dump_left.write(prev_record.data) | |
elif prev_record.direction == "<" and dump_right: | |
dump_right.write(prev_record.data) | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment