Created
July 5, 2020 00:07
-
-
Save Tatsh/436a9f7aeb0e04a5f9ec1e053db92076 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from datetime import datetime | |
from os import fstat | |
from struct import unpack | |
from typing import (Any, BinaryIO, Mapping, MutableMapping, List, Optional, | |
Sequence, Union, cast) | |
import json | |
import sys | |
__all__ = ('SolMapping', 'parse_sol') | |
SolMapping = Mapping[Union[int, str], Any] | |
SolItem = Union[Optional[str], Sequence[Any], SolMapping, bool, bytes, | |
datetime, float, int] | |
SolMutableMapping = MutableMapping[Union[int, str], Any] | |
END_OBJECT_SEQUENCE = b'\x00\x00\t' | |
def _read_sequence_or_object( | |
f: BinaryIO, | |
obj: Optional[SolMutableMapping] = None | |
) -> Union[Sequence[Any], SolMapping]: | |
ret: List[SolItem] = [] | |
count = _read_long(f) | |
if not count: | |
return _read_object(f) | |
if not obj: | |
obj = {} | |
for index2 in range(count): | |
index = _read_str(f) | |
if not index: | |
break | |
obj[index] = None | |
item = _read_item(f, obj, len(obj) - 1) | |
ret.append(item) | |
if index2 >= (count - 1): | |
break | |
end = f.read(3) | |
if end == b'\x00\x00\x09': | |
return ret | |
return [] | |
def _read_str(f: BinaryIO) -> Optional[str]: | |
s_len = cast(int, unpack('>H', f.read(2))[0]) | |
if s_len: | |
s = f.read(s_len) | |
return s.decode() | |
return None | |
def _read_object(f: BinaryIO, | |
obj: Optional[SolMutableMapping] = None) -> SolMapping: | |
ret: SolMutableMapping = {} | |
end2 = unpack('3s', f.read(3))[0] | |
if end2 == END_OBJECT_SEQUENCE: | |
return ret | |
while end2 != END_OBJECT_SEQUENCE: | |
f.seek(-3, 1) | |
key = _read_str(f) | |
assert key is not None | |
if not obj: | |
obj = {} | |
obj[key] = None | |
ret[key] = _read_item(f, obj, key) | |
end2 = unpack('3s', f.read(3))[0] | |
return ret | |
def _read_long(f: BinaryIO) -> int: | |
return cast(int, unpack('>L', f.read(4))[0]) | |
def _read_item(f: BinaryIO, | |
mapping: SolMutableMapping, | |
key: Optional[Union[str, int]] = None) -> SolItem: | |
def read_number(f: BinaryIO) -> float: | |
return cast(float, unpack('>d', f.read(8))[0]) | |
type_ = cast(int, unpack('B', f.read(1))[0]) | |
if type_ == 0x2: | |
return _read_str(f) | |
if type_ == 0x0: | |
return read_number(f) | |
if type_ == 0x10: | |
return 'unknown custom class' | |
if type_ == 0x8: | |
if key: | |
mapping[key] = {} | |
return _read_sequence_or_object(f, mapping[key]) | |
return _read_sequence_or_object(f) | |
if type_ == 0x3: | |
assert key is not None | |
mapping[key] = {} | |
return _read_object(f, mapping[key]) | |
if type_ == 0xf: | |
return f.read(_read_long(f)).decode() | |
if type_ == 0x1: | |
return cast(bool, unpack('?', f.read(1))[0]) | |
if type_ == 0xb: | |
basetime = datetime(1970, 1, 1).timestamp() | |
num = read_number(f) | |
sec = num / 1000 + basetime | |
return datetime.fromtimestamp(sec) | |
return f'unknown type 0x{type_:x}' | |
def parse_sol(filename: str) -> Mapping[str, SolMapping]: | |
"""Parse a local shared object (AMF 1 only) file.""" | |
with open(filename, 'rb') as f: | |
total_size = fstat(f.fileno()).st_size | |
mapping: SolMutableMapping = {} | |
f.seek(6) | |
if f.read(4) != b'TCSO': | |
raise ValueError('Invalid header') | |
f.seek(16) | |
so_name = _read_str(f) | |
assert so_name is not None | |
amf_version = cast(int, unpack('>i', f.read(4))[0]) | |
if amf_version not in (0, 3): | |
raise ValueError('Unsupported SOL format') | |
if amf_version == 3: | |
raise NotImplementedError('AMF 3 not implemented') | |
while f.tell() < total_size: | |
key = _read_str(f) | |
if not key: | |
break | |
mapping[key] = _read_item(f, mapping, key) | |
if f.read(1) != b'\x00': | |
break | |
return {so_name: mapping} | |
if __name__ == '__main__': | |
if len(sys.argv) != 2: | |
print(f'Usage: {sys.argv[0]} FILE', file=sys.stderr) | |
sys.exit(1) | |
print(json.dumps(parse_sol(sys.argv[1]))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment