Skip to content

Instantly share code, notes, and snippets.

@Tatsh
Created May 31, 2020 04:54
Show Gist options
  • Save Tatsh/c3adffdb12d5f2cc064caa04c63fb52e to your computer and use it in GitHub Desktop.
Save Tatsh/c3adffdb12d5f2cc064caa04c63fb52e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from dataclasses import dataclass
from os.path import isdir, join
from struct import unpack
from typing import Optional, Tuple
import logging
import os
import sys
__all__ = ('extract_concatenated_oggs', )
TYPE_BOS = 2
TYPE_CONTINUE = 1
TYPE_EOS = 4
TYPE_EOS_2 = 5
TYPE_FREE = 0
@dataclass
class ReadState:
contents = b''
end_of_file = False
expected_type: Tuple[int, ...] = (TYPE_BOS, )
last_page_sequence: Optional[int] = None
def expect_end_or_continue(self) -> None:
self.expected_type = (TYPE_EOS, TYPE_EOS_2, TYPE_CONTINUE, TYPE_FREE)
def expect_beginning(self) -> None:
self.expected_type = (TYPE_BOS, )
def reset(self) -> None:
self.contents = b''
self.end_of_file = False
self.expected_type = (TYPE_BOS, )
self.last_page_sequence = None
def extract_concatenated_oggs(input_file: str, outdir: str = './out') -> int:
"""
Searches a file of concatenated OGG files and extracts them into
numbered files into outdir.
Returns the number of files written.
"""
count = 1
log = logging.getLogger('extractor')
state = ReadState()
if not isdir(outdir):
os.mkdir(outdir)
with open(input_file, 'rb') as f:
size = os.stat(input_file).st_size
while f.tell() < size:
offset = f.tell()
state.contents += f.read(4)
if state.contents[-4:] == b'OggS':
state.contents += f.read(2)
header_type = state.contents[-1]
assert header_type in state.expected_type
if state.expected_type == (TYPE_BOS, ):
state.expect_end_or_continue()
if header_type in (TYPE_EOS, TYPE_EOS_2):
state.expect_beginning()
state.contents += f.read(16)
page_sequence = unpack('<I', state.contents[-4:])[0]
if state.last_page_sequence is not None:
assert page_sequence == state.last_page_sequence + 1
state.last_page_sequence = page_sequence
if header_type in (4, 5):
state.end_of_file = True
state.contents += f.read(5)
page_segments = state.contents[-1]
segment_lengths = []
for _ in range(page_segments):
state.contents += f.read(1)
segment_lengths.append(state.contents[-1])
state.contents += f.read(sum(segment_lengths))
log.debug(
'offset: 0x%x, header type: %s, page '
'sequence number: %d, page segments: %d', offset,
bin(header_type), page_sequence, page_segments)
if state.end_of_file:
with open(join(outdir, f'{count:05d}.ogg'),
'wb+') as outfile:
outfile.write(state.contents)
state.reset()
count += 1
return 0 if count == 1 else count
if __name__ == '__main__':
extract_concatenated_oggs(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment