Created
May 31, 2020 04:54
-
-
Save Tatsh/c3adffdb12d5f2cc064caa04c63fb52e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from dataclasses import dataclass | |
from os.path import isdir, join | |
from struct import unpack | |
from typing import Optional, Tuple | |
import logging | |
import os | |
import sys | |
__all__ = ('extract_concatenated_oggs', ) | |
TYPE_BOS = 2 | |
TYPE_CONTINUE = 1 | |
TYPE_EOS = 4 | |
TYPE_EOS_2 = 5 | |
TYPE_FREE = 0 | |
@dataclass | |
class ReadState: | |
contents = b'' | |
end_of_file = False | |
expected_type: Tuple[int, ...] = (TYPE_BOS, ) | |
last_page_sequence: Optional[int] = None | |
def expect_end_or_continue(self) -> None: | |
self.expected_type = (TYPE_EOS, TYPE_EOS_2, TYPE_CONTINUE, TYPE_FREE) | |
def expect_beginning(self) -> None: | |
self.expected_type = (TYPE_BOS, ) | |
def reset(self) -> None: | |
self.contents = b'' | |
self.end_of_file = False | |
self.expected_type = (TYPE_BOS, ) | |
self.last_page_sequence = None | |
def extract_concatenated_oggs(input_file: str, outdir: str = './out') -> int: | |
""" | |
Searches a file of concatenated OGG files and extracts them into | |
numbered files into outdir. | |
Returns the number of files written. | |
""" | |
count = 1 | |
log = logging.getLogger('extractor') | |
state = ReadState() | |
if not isdir(outdir): | |
os.mkdir(outdir) | |
with open(input_file, 'rb') as f: | |
size = os.stat(input_file).st_size | |
while f.tell() < size: | |
offset = f.tell() | |
state.contents += f.read(4) | |
if state.contents[-4:] == b'OggS': | |
state.contents += f.read(2) | |
header_type = state.contents[-1] | |
assert header_type in state.expected_type | |
if state.expected_type == (TYPE_BOS, ): | |
state.expect_end_or_continue() | |
if header_type in (TYPE_EOS, TYPE_EOS_2): | |
state.expect_beginning() | |
state.contents += f.read(16) | |
page_sequence = unpack('<I', state.contents[-4:])[0] | |
if state.last_page_sequence is not None: | |
assert page_sequence == state.last_page_sequence + 1 | |
state.last_page_sequence = page_sequence | |
if header_type in (4, 5): | |
state.end_of_file = True | |
state.contents += f.read(5) | |
page_segments = state.contents[-1] | |
segment_lengths = [] | |
for _ in range(page_segments): | |
state.contents += f.read(1) | |
segment_lengths.append(state.contents[-1]) | |
state.contents += f.read(sum(segment_lengths)) | |
log.debug( | |
'offset: 0x%x, header type: %s, page ' | |
'sequence number: %d, page segments: %d', offset, | |
bin(header_type), page_sequence, page_segments) | |
if state.end_of_file: | |
with open(join(outdir, f'{count:05d}.ogg'), | |
'wb+') as outfile: | |
outfile.write(state.contents) | |
state.reset() | |
count += 1 | |
return 0 if count == 1 else count | |
if __name__ == '__main__': | |
extract_concatenated_oggs(sys.argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment