Skip to content

Instantly share code, notes, and snippets.

@Tatsh
Created July 5, 2020 00:07
Show Gist options
  • Save Tatsh/436a9f7aeb0e04a5f9ec1e053db92076 to your computer and use it in GitHub Desktop.
Save Tatsh/436a9f7aeb0e04a5f9ec1e053db92076 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from datetime import datetime
from os import fstat
from struct import unpack
from typing import (Any, BinaryIO, Mapping, MutableMapping, List, Optional,
Sequence, Union, cast)
import json
import sys
__all__ = ('SolMapping', 'parse_sol')
SolMapping = Mapping[Union[int, str], Any]
SolItem = Union[Optional[str], Sequence[Any], SolMapping, bool, bytes,
datetime, float, int]
SolMutableMapping = MutableMapping[Union[int, str], Any]
END_OBJECT_SEQUENCE = b'\x00\x00\t'
def _read_sequence_or_object(
f: BinaryIO,
obj: Optional[SolMutableMapping] = None
) -> Union[Sequence[Any], SolMapping]:
ret: List[SolItem] = []
count = _read_long(f)
if not count:
return _read_object(f)
if not obj:
obj = {}
for index2 in range(count):
index = _read_str(f)
if not index:
break
obj[index] = None
item = _read_item(f, obj, len(obj) - 1)
ret.append(item)
if index2 >= (count - 1):
break
end = f.read(3)
if end == b'\x00\x00\x09':
return ret
return []
def _read_str(f: BinaryIO) -> Optional[str]:
s_len = cast(int, unpack('>H', f.read(2))[0])
if s_len:
s = f.read(s_len)
return s.decode()
return None
def _read_object(f: BinaryIO,
obj: Optional[SolMutableMapping] = None) -> SolMapping:
ret: SolMutableMapping = {}
end2 = unpack('3s', f.read(3))[0]
if end2 == END_OBJECT_SEQUENCE:
return ret
while end2 != END_OBJECT_SEQUENCE:
f.seek(-3, 1)
key = _read_str(f)
assert key is not None
if not obj:
obj = {}
obj[key] = None
ret[key] = _read_item(f, obj, key)
end2 = unpack('3s', f.read(3))[0]
return ret
def _read_long(f: BinaryIO) -> int:
return cast(int, unpack('>L', f.read(4))[0])
def _read_item(f: BinaryIO,
mapping: SolMutableMapping,
key: Optional[Union[str, int]] = None) -> SolItem:
def read_number(f: BinaryIO) -> float:
return cast(float, unpack('>d', f.read(8))[0])
type_ = cast(int, unpack('B', f.read(1))[0])
if type_ == 0x2:
return _read_str(f)
if type_ == 0x0:
return read_number(f)
if type_ == 0x10:
return 'unknown custom class'
if type_ == 0x8:
if key:
mapping[key] = {}
return _read_sequence_or_object(f, mapping[key])
return _read_sequence_or_object(f)
if type_ == 0x3:
assert key is not None
mapping[key] = {}
return _read_object(f, mapping[key])
if type_ == 0xf:
return f.read(_read_long(f)).decode()
if type_ == 0x1:
return cast(bool, unpack('?', f.read(1))[0])
if type_ == 0xb:
basetime = datetime(1970, 1, 1).timestamp()
num = read_number(f)
sec = num / 1000 + basetime
return datetime.fromtimestamp(sec)
return f'unknown type 0x{type_:x}'
def parse_sol(filename: str) -> Mapping[str, SolMapping]:
"""Parse a local shared object (AMF 1 only) file."""
with open(filename, 'rb') as f:
total_size = fstat(f.fileno()).st_size
mapping: SolMutableMapping = {}
f.seek(6)
if f.read(4) != b'TCSO':
raise ValueError('Invalid header')
f.seek(16)
so_name = _read_str(f)
assert so_name is not None
amf_version = cast(int, unpack('>i', f.read(4))[0])
if amf_version not in (0, 3):
raise ValueError('Unsupported SOL format')
if amf_version == 3:
raise NotImplementedError('AMF 3 not implemented')
while f.tell() < total_size:
key = _read_str(f)
if not key:
break
mapping[key] = _read_item(f, mapping, key)
if f.read(1) != b'\x00':
break
return {so_name: mapping}
if __name__ == '__main__':
if len(sys.argv) != 2:
print(f'Usage: {sys.argv[0]} FILE', file=sys.stderr)
sys.exit(1)
print(json.dumps(parse_sol(sys.argv[1])))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment