Skip to content

Instantly share code, notes, and snippets.

@minkione
Forked from RoganDawes/socat-x-process.py
Created March 17, 2024 21:40
Show Gist options
  • Save minkione/909251842055a059f3a62060f57c56a4 to your computer and use it in GitHub Desktop.
Save minkione/909251842055a059f3a62060f57c56a4 to your computer and use it in GitHub Desktop.
A python script to coalesce output from `socat -x` hexdumps.
#!/usr/bin/env python3
import sys
import argparse
import datetime
import datetime
import re
class TimeDeltaType(object):
"""
Interprets a string as a timedelta for argument parsing.
With no default unit:
>>> tdtype = TimeDeltaType()
>>> tdtype('5s')
datetime.timedelta(0, 5)
>>> tdtype('5.5s')
datetime.timedelta(0, 5, 500000)
>>> tdtype('5:06:07:08s')
datetime.timedelta(5, 22028)
>>> tdtype('5d06h07m08s')
datetime.timedelta(5, 22028)
>>> tdtype('5d')
datetime.timedelta(5)
With a default unit of minutes:
>>> tdmins = TimeDeltaType('m')
>>> tdmins('5s')
datetime.timedelta(0, 5)
>>> tdmins('5')
datetime.timedelta(0, 300)
>>> tdmins('6:05')
datetime.timedelta(0, 21900)
And some error cases:
>>> tdtype('5')
Traceback (most recent call last):
...
ValueError: Cannot infer units for '5'
>>> tdtype('5:5d')
Traceback (most recent call last):
...
ValueError: Colon not handled for unit 'd'
>>> tdtype('5:5ms')
Traceback (most recent call last):
...
ValueError: Colon not handled for unit 'ms'
>>> tdtype('5q')
Traceback (most recent call last):
...
ValueError: Unknown unit: 'q'
"""
units = {
'd': datetime.timedelta(days=1),
'h': datetime.timedelta(seconds=60 * 60),
'm': datetime.timedelta(seconds=60),
's': datetime.timedelta(seconds=1),
'ms': datetime.timedelta(microseconds=1000),
}
colon_mult_ind = ['h', 'm', 's']
colon_mults = [24, 60, 60]
unit_re = re.compile(r'[^\d:.,-]+', re.UNICODE)
def __init__(self, default_unit=None):
self.default_unit = default_unit
def __call__(self, val):
res = datetime.timedelta()
for num, unit in self._parse(val):
unit = unit.lower()
if ':' in num:
try:
colon_mults = self.colon_mults[:self.colon_mult_ind.index(unit) + 1]
except ValueError:
raise ValueError('Colon not handled for unit %r' % unit)
else:
colon_mults = []
try:
unit = self.units[unit]
except KeyError:
raise ValueError('Unknown unit: %r' % unit)
mult = 1
for part in reversed(num.split(':')):
res += self._mult_td(unit, (float(part) if '.' in part else int(part)) * mult)
if colon_mults:
mult *= colon_mults.pop()
return res
def _parse(self, val):
pairs = []
start = 0
for match in self.unit_re.finditer(val):
num = val[start:match.start()]
unit = match.group()
pairs.append((num, unit))
start = match.end()
num = val[start:]
if num:
if pairs or self.default_unit is None:
raise ValueError('Cannot infer units for %r' % num)
else:
pairs.append((num, self.default_unit))
return pairs
@staticmethod
def _mult_td(td, mult):
# Necessary because timedelta * float is not supported:
return datetime.timedelta(days=td.days * mult, seconds=td.seconds * mult, microseconds=td.microseconds * mult)
def mkdelta(str):
return TimeDeltaType()(str)
class hexdump:
def __init__(self, buf, asc=False):
self.buf = buf
self.asc = asc
def asciify(self, bs):
if not self.asc:
return ""
else:
return " {:16}".format(
"".join((chr(x) if 32 <= x < 127 else "." for x in bs)))
def __iter__(self):
last_bs, last_line = None, None
for i in range(0, len(self.buf), 16):
bs = bytearray(self.buf[i : i + 16])
line = " {:23} {:23}{}".format(
" ".join(("{:02x}".format(x) for x in bs[:8])),
" ".join(("{:02x}".format(x) for x in bs[8:])),
self.asciify(bs),
)
if bs == last_bs:
line = "*"
if bs != last_bs or line != last_line:
yield line
last_bs, last_line = bs, line
def __str__(self):
return "\n".join(self)
def __repr__(self):
return "\n".join(self)
class Record:
direction = { ">":">", "<":"<" }
quiet = False
asc = True
def __init__(self, line):
parts = line.split()
self.direction=parts[0]
self.time = datetime.datetime.strptime(" ".join(parts[1:3]), '%Y/%m/%d %H:%M:%S.%f')
self.length = int(parts[3].split("=")[1])
self.start = int(parts[4].split("=")[1])
self.to = int(parts[5].split("=")[1])
self.data = bytearray()
def append_data(self, str):
str = str[1:]
if len(str) > 48:
str = str[:48]
self.data += bytes.fromhex(str)
def append_bytes(self, data):
self.data += data
self.length += len(data)
self.to += len(data)
def validate(self):
assert len(self.data) == self.length, f"{len(self.data)} != {self.length}"
assert self.length == self.to - self.start + 1, f"{self.length} != {self.to} - {self.start} + 1"
def __repr__(self):
ret = Record.direction[self.direction]
if not Record.quiet:
ret += f" {self.time.strftime('%Y/%m/%d %H:%M:%S.%f')} length={self.length} from={self.start} to={self.to}"
ret += "\n" + str(hexdump(self.data, Record.asc)) + "\n" + "--"
return ret
def init_argparse() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
usage="%(prog)s [OPTION] [FILE]...",
description="Pipeline to post-process output of 'socat -x' in various ways.",
)
parser.add_argument(
"-v", "--version", action="version",
version=f"{parser.prog} version 1.0.0"
)
parser.add_argument(
"-c", "--coalesce", help="Coalesce data travelling in the same direction, within the specified duration",
type=mkdelta, default=mkdelta("0s"))
parser.add_argument(
"-l", "--left", help="Provide a description for data travelling from left to right '>'")
parser.add_argument(
"-r", "--right", help="Provide a description for data travelling from right to left '<'")
parser.add_argument(
"-i", "--incremental", help="Change timing information to incremental i.e. time difference from previous packet")
parser.add_argument(
"-q", "--quiet", action="store_true", help="Reduce the verbosity of the output to help align diffs")
parser.add_argument(
"-x", "--switch", action="store_true", help="Switch indicated direction of communications")
parser.add_argument(
"--dump-left", help="Saves the bytes of the left direction to the designated file. Affected by '-x'")
parser.add_argument(
"--dump-right", help="Saves the bytes of the right direction to the designated file. Affected by '-x'")
return parser
def main() -> None:
parser = init_argparse()
args = parser.parse_args()
dump_left = None
dump_right = None
if args.left:
Record.direction[">"] = args.left
if args.right:
Record.direction["<"] = args.right
if args.quiet:
Record.quiet = args.quiet
if args.dump_left:
dump_left = open(args.dump_left, "wb")
if args.dump_right:
dump_right = open(args.dump_right, "wb")
if args.switch:
tmp = Record.direction[">"]
Record.direction[">"] = Record.direction["<"]
Record.direction["<"] = tmp
tmp = dump_left
dump_left = dump_right
dump_right = tmp
record = None
prev_record = None
last_record_time = None
while (line := sys.stdin.readline()) is not None:
if line.strip() == "":
break
if not record:
if line[0] in ('<','>'):
record = Record(line)
if prev_record != None:
if record.direction != prev_record.direction:
print(prev_record)
if prev_record.direction == ">" and dump_left:
dump_left.write(prev_record.data)
elif prev_record.direction == "<" and dump_right:
dump_right.write(prev_record.data)
prev_record = None
elif record.time - prev_record.time > args.coalesce:
print(prev_record)
if prev_record.direction == ">" and dump_left:
dump_left.write(prev_record.data)
elif prev_record.direction == "<" and dump_right:
dump_right.write(prev_record.data)
prev_record = None
else:
print(f"Discarding unexpected data: {line}", file=sys.stderr)
elif line[0:2] == "--":
record.validate()
if prev_record is not None:
prev_record.append_bytes(record.data)
prev_record.validate()
else:
prev_record = record
record = None
elif line[0] == " ":
record.append_data(line)
if prev_record is not None:
print(prev_record)
if prev_record.direction == ">" and dump_left:
dump_left.write(prev_record.data)
elif prev_record.direction == "<" and dump_right:
dump_right.write(prev_record.data)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment