Skip to content

Instantly share code, notes, and snippets.

@dmitmel
Last active September 23, 2021 18:06
Show Gist options
  • Select an option

  • Save dmitmel/609a635cc849bd5dea9f8f7c48e23685 to your computer and use it in GitHub Desktop.

Select an option

Save dmitmel/609a635cc849bd5dea9f8f7c48e23685 to your computer and use it in GitHub Desktop.
Prototype unpacker utility for Eastward's `.g` archives
#!/usr/bin/env python3
# Based on research from <https://zenhax.com/viewtopic.php?f=9&t=15820#p66444>
from __future__ import annotations
import argparse
import os
import struct
import sys
from enum import Enum
from typing import Callable, Final, Iterable, Iterator, List, Sized
import zstandard as zstd
EWA_MAGIC: Final[bytes] = b'\x37\x6a\x00\x00'
_STRUCT_HEADER = '<L'
_STRUCT_HEADER_SIZE = struct.calcsize(_STRUCT_HEADER)
_STRUCT_ENTRY = '<LLLL'
_STRUCT_ENTRY_SIZE = struct.calcsize(_STRUCT_ENTRY)
class EwaCompressType(Enum):
STORE = 0
ZSTD = 2
class EwaEntry(Sized):
def __init__(self, archive: EwaArchive, name: bytes) -> None:
self.archive = archive
self.name = name
self.offset = 0
self.size = 0
self.compress_type = EwaCompressType.STORE
self.real_size = 0
def get_raw_data(self) -> bytes:
return self.archive.data[self.offset:self.offset + self.size]
def __len__(self) -> int:
return self.size
class EwaArchive(Sized, Iterable[EwaEntry]):
def __init__(self, data: bytes) -> None:
self.data = data
self.offset = 0
self.entries_count = 0
self.entries: List[EwaEntry] = []
def _take_data(self, size: int) -> bytes:
slice = self.data[self.offset:self.offset + size]
self.offset += size
return slice
def read_header(self) -> None:
if self._take_data(len(EWA_MAGIC)) != EWA_MAGIC:
raise Exception('magic does not match!')
self.entries_count, = struct.unpack(_STRUCT_HEADER, self._take_data(_STRUCT_HEADER_SIZE))
for _ in range(self.entries_count):
file_name_len = self.data.index(0, self.offset) - self.offset + 1
file_name = self._take_data(file_name_len)[:-1]
entry = EwaEntry(self, file_name)
offset, compress_type, real_size, size = (
struct.unpack(_STRUCT_ENTRY, self._take_data(_STRUCT_ENTRY_SIZE))
)
if offset > len(self.data):
raise IndexError('entry offset out of bounds!')
if offset + size > len(self.data):
raise IndexError('entry end out of bounds!')
entry.offset = offset
entry.size = size
entry.compress_type = EwaCompressType(compress_type)
entry.real_size = real_size
self.entries.append(entry)
def __len__(self) -> int:
return self.entries_count
def __iter__(self) -> Iterator[EwaEntry]:
return iter(self.entries)
def main(cmd_argv: List[str]) -> int:
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True, metavar='COMMAND')
subparser = subparsers.add_parser('list', aliases='l', help='')
subparser.set_defaults(func=cmd_list)
subparser.add_argument('archive')
subparser.add_argument('--verbose', '-v', action='store_true')
subparser = subparsers.add_parser('extract', aliases='x', help='')
subparser.set_defaults(func=cmd_extract)
subparser.add_argument('archive')
subparser.add_argument('--verbose', '-v', action='store_true')
subparser.add_argument('--output', '-o')
subparser = subparsers.add_parser('create', aliases='c', help='')
subparser.set_defaults(func=cmd_create)
subparser.add_argument('archive')
subparser.add_argument('files', nargs='*')
subparser.add_argument('--verbose', '-v', action='store_true')
args = parser.parse_args()
handler: Callable[[argparse.Namespace], int] = args.func
return handler(args)
def cmd_list(args: argparse.Namespace) -> int:
with open(args.archive, 'rb') as archive_file:
archive = EwaArchive(archive_file.read())
archive.read_header()
if args.verbose:
print(' offset size name')
print(' -------------------------------')
for entry in archive:
print(
' {:08x} {:<8} {}'.format(
entry.offset, entry.size, entry.name.decode('utf8', 'replace')
)
)
else:
for entry in archive:
print(entry.name.decode('utf8', 'replace'))
return 0
def cmd_extract(args: argparse.Namespace) -> int:
output_dir = os.fsencode(args.output) if args.output is not None else os.getcwdb()
with open(args.archive, 'rb') as archive_file:
archive = EwaArchive(archive_file.read())
archive.read_header()
comp_ctx = zstd.ZstdDecompressor()
for entry in archive:
if args.verbose:
print(entry.name.decode('utf8', 'replace'))
entry_full_path = os.path.join(output_dir, entry.name)
entry_dir = os.path.dirname(entry_full_path)
if len(entry_dir) > 0:
os.makedirs(entry_dir, exist_ok=True)
with open(entry_full_path, 'wb') as entry_file:
if entry.compress_type == EwaCompressType.STORE:
entry_file.write(entry.get_raw_data())
entry_file.flush()
else:
with comp_ctx.stream_writer(entry_file, write_size=entry.size) as entry_writer:
entry_writer.write(entry.get_raw_data())
entry_writer.flush()
return 0
def cmd_create(args: argparse.Namespace) -> int:
with open(args.archive, 'wb') as archive_file:
data = bytearray()
archive = EwaArchive()
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment