Created
May 17, 2012 03:50
-
-
Save youzaka/2716124 to your computer and use it in GitHub Desktop.
declarative ts parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from collections import defaultdict | |
from io import BufferedReader, FileIO | |
from types import FunctionType | |
class SyntaxTable(dict): | |
def __init__(self): | |
self.history = [] | |
self.keys = [] | |
def __setitem__(self, key, value): | |
if issubclass(value.__class__, mnemonic): | |
value.history = list(self.history) | |
self.history.append(value.length) | |
self.keys.append(key) | |
dict.__setitem__(self, key, value) | |
class SyntaxClass(type): | |
@classmethod | |
def __prepare__(metacls, name, bases): | |
return SyntaxTable() | |
def __new__(cls, name, bases, classdict): | |
result = type.__new__(cls, name, bases, dict(classdict)) | |
result._history = classdict.history | |
result._keys = classdict.keys | |
return result | |
class mnemonic(object): | |
def __init__(self, length): | |
self.length = length | |
self.history = [] | |
def real_length(self, length, instance): | |
if isinstance(length, FunctionType): | |
return length(instance) * 8 | |
if isinstance(length, str): | |
return getattr(instance, length) | |
return length | |
class uimsbf(mnemonic): | |
def __get__(self, instance, owner): | |
start = sum(self.real_length(length, instance) for length in self.history) | |
return self.uimsbf(instance._packet, start, self.length) | |
@staticmethod | |
def uimsbf(packet, current, length): | |
pos = current % 8 | |
block = current // 8 | |
start = 8 - pos | |
try: | |
if length > 8: | |
num = ((length - 1) // 8) * 8 | |
return (((packet[block] & (2 ** start) - 1) << num) | | |
uimsbf.uimsbf(packet, current + start, length - start)) | |
else: | |
end = start - length | |
return (packet[block] & (2 ** start - 2 ** end)) >> end | |
except IndexError: | |
return 0 | |
class bslbf(mnemonic): | |
def __get__(self, instance, owner): | |
start = sum(self.real_length(length, instance) for length in self.history) | |
if self.length == 1: | |
return uimsbf.uimsbf(instance._packet, start, self.length) == 1 | |
else: | |
return ''.join(str(uimsbf.uimsbf(instance._packet, pos, 1)) | |
for pos in range(start, start + self.length)) | |
rpchof = bslbf | |
class nsize(mnemonic): | |
def __init__(self, cls, length): | |
self.cls = cls | |
mnemonic.__init__(self, length) | |
def __get__(self, instance, owner): | |
length = self.real_length(self.length, instance) // 8 | |
start = sum(getattr(instance, length) if isinstance(length, str) else length | |
for length in self.history) // 8 | |
end = start + length | |
while start < end: | |
obj = self.cls(instance._packet[start:end]) | |
yield obj | |
start += len(obj) // 8 | |
class Table(metaclass=SyntaxClass): | |
def __init__(self, packet): | |
self._packet = packet | |
def __len__(self): | |
return sum(length if isinstance(length, int) else getattr(self, length) | |
for length in self._history) | |
def __str__(self): | |
return "\n".join("{}: {}".format(field, getattr(self, field)) | |
for field in self._keys) | |
def __getitem__(self, key): | |
return self._packet[key] | |
@classmethod | |
def loop(cls, length=None, times=None): | |
if length: | |
return nsize(cls, length) | |
def loop(length): | |
def fixed_length_loop(cls): | |
return nsize(cls, length) | |
return fixed_length_loop | |
class TransportStreamFile(BufferedReader): | |
PACKET_SIZE = 188 | |
def __init__(self, path): | |
BufferedReader.__init__(self, FileIO(path)) | |
def __next__(self): | |
packet = bytearray(self.read(self.PACKET_SIZE)) | |
if len(packet) != self.PACKET_SIZE: | |
raise StopIteration | |
return TransportPacket(packet) | |
def table(self, Table): | |
buf = defaultdict(bytearray) | |
for packet in self: | |
pid = packet.PID | |
if pid not in Table._pids or not packet.has_payload: | |
continue | |
if packet.payload_unit_start_indicator: | |
if buf[pid]: | |
if (not hasattr(Table, '_table_ids') or | |
buf[pid][0] in Table._table_ids): | |
yield Table(buf[pid]) | |
buf[pid] = packet.payload | |
else: | |
buf[pid].extend(packet.payload) | |
class TransportPacket(Table): | |
sync_byte = uimsbf(8) | |
transport_error_indicator = bslbf(1) | |
payload_unit_start_indicator = bslbf(1) | |
transport_priority = bslbf(1) | |
PID = uimsbf(13) | |
transport_scrambling_control = bslbf(2) | |
adaptation_field_control = bslbf(2) | |
continuity_counter = uimsbf(4) | |
@property | |
def has_adaptation(self): | |
return self.adaptation_field_control[0] | |
@property | |
def has_payload(self): | |
return self.adaptation_field_control[1] | |
@property | |
def payload(self): | |
if not self.has_payload: | |
return [] | |
start = 4 | |
if self.has_adaptation: | |
adaptation_length = self._packet[start] | |
start += 1 + adaptation_length | |
if (self.payload_unit_start_indicator and | |
self._packet[start:start+3] == b"\x00\x00\x01"): | |
pointer = self._packet[start] | |
start += 1 + pointer | |
return self._packet[start:] | |
class ProgramAssociationTable(Table): | |
_pids = [0x00] | |
_table_ids = [0x00] | |
table_id = uimsbf(8) | |
section_syntax_indicator = bslbf(1) | |
reserved_future_use = bslbf(1) | |
reserved_1 = bslbf(2) | |
section_length = uimsbf(12) | |
transport_stream_id = uimsbf(16) | |
resered_2 = bslbf(2) | |
version_number = uimsbf(5) | |
current_next_indicator = bslbf(1) | |
section_number = uimsbf(8) | |
last_section_number = uimsbf(8) | |
@loop(lambda self: self.section_length - 9) | |
class pids(Table): | |
program_number = uimsbf(16) | |
reserved = bslbf(3) | |
program_map_PID = uimsbf(13) | |
CRC_32 = rpchof(32) | |
if __name__ == '__main__': | |
import sys | |
with TransportStreamFile(sys.argv[1]) as ts: | |
for pat in ts.table(ProgramAssociationTable): | |
for patmap in pat.pids: | |
print(patmap.program_number, patmap.program_map_PID) | |
print(pat.CRC_32) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment