Last active
March 18, 2022 16:09
-
-
Save SeanPesce/ef2dd86a0204bcdfe8b42b9cd233ef30 to your computer and use it in GitHub Desktop.
Deus Ex: Mankind Divided (DXMD) .archive file extractor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Author: Sean Pesce | |
""" | |
The classes in this file can be used to extract files from the *.archive files used by DXMD. | |
Extraction of files that span multiple archives is also supported. | |
""" | |
import logging | |
import os | |
import sys | |
from ctypes import c_byte, c_char, c_uint32, c_uint64 | |
from ctypes_util import StructLE | |
# Maximum read size | |
READ_SIZE = 50 * 1024 * 1024 # 50MB | |
# @TODO: Convert code to use lib.file_util | |
class ArchiveString: | |
# Size of the unsigned integer that stores the string length | |
STR_LEN_WIDTH = 4 | |
def __init__(self, value=None, endianness='little', encoding='utf8'): | |
self.endianness = endianness | |
self.encoding = encoding | |
self.value = value | |
@classmethod | |
def from_fd(cls, fd): | |
s = ArchiveString() | |
# Read string length | |
data = fd.read(cls.STR_LEN_WIDTH) | |
assert len(data) == cls.STR_LEN_WIDTH, f'Expected {cls.STR_LEN_WIDTH} bytes, received {len(data)}' | |
str_len = int.from_bytes(data, s.endianness, signed=False) | |
# Read string value | |
data = fd.read(str_len) | |
assert len(data) == str_len, f'Expected {str_len} bytes, received {len(data)}' | |
s.value = data.decode(s.encoding) | |
# Consume null terminator | |
data = fd.read(1) | |
assert data == b'\x00', f'Expected null terminator, received {data}' | |
return s | |
def __repr__(self): | |
return f'{type(self).__name__}("{self.value}")' | |
def __str__(self): | |
return self.value | |
def __bytes__(self): | |
data = self.value.encode(self.encoding) | |
data = len(self.value).to_bytes(ArchiveString.STR_LEN_WIDTH, self.endianness, signed=False) + data | |
return data + b'\x00' | |
class FileChunk(StructLE): | |
_fields_ = [ | |
('archive', c_uint32), # Archive link index (archive file containing this chunk) | |
('begin', c_uint64), # Offset in original source file | |
('offset', c_uint64), # Offset in archive file | |
('length', c_uint64), | |
] | |
class InnerFileHeader(StructLE): | |
_fields_ = [ | |
('timestamp', c_uint64), | |
('unk1', c_byte * 16), # Some kind of hash? | |
] | |
class InnerFile: | |
def __init__(self, endianness='little'): | |
self.endianness = endianness | |
self.header = None | |
self._name = None | |
self.chunk_count = None | |
self.chunks = None | |
@classmethod | |
def from_fd(cls, fd): | |
file = cls() | |
data = fd.read(InnerFileHeader.sizeof()) | |
file.header = InnerFileHeader.from_bytes(data) | |
file._name = ArchiveString.from_fd(fd) | |
data = fd.read(4) | |
assert len(data) == 4, f'Expected {4} bytes, received {len(data)}' | |
file.chunk_count = int.from_bytes(data, file.endianness, signed=False) | |
file.chunks = [] | |
for _ in range(0, file.chunk_count): | |
read_sz = FileChunk.sizeof() | |
data = fd.read(read_sz) | |
assert len(data) == read_sz, f'Expected {read_sz} bytes, received {len(data)}' | |
chunk = FileChunk.from_bytes(data) | |
file.chunks.append(chunk) | |
return file | |
@property | |
def total_size(self): | |
sz = 0 | |
for c in self.chunks: | |
sz += c.length | |
return sz | |
@property | |
def name(self): | |
return str(self._name) | |
def __repr__(self): | |
s = f'{type(self).__name__}\n{self.header}{self.name=}\n{self.total_size=}\n{self.chunk_count=}\nself.chunks=\n' | |
for c in self.chunks: | |
s += f' {c}'.replace('\n', '\n ') | |
return s | |
def __bytes__(self): | |
data = bytes(self.header) + bytes(self._name) + self.chunk_count.to_bytes(4, self.endianness, signed=False) | |
for c in self.chunks: | |
data += bytes(c) | |
return data | |
class ArchiveHeader(StructLE): | |
_fields_ = [ | |
('magic', c_char * 4), | |
('unk1', c_uint32), # version ID? | |
('file_count', c_uint32), | |
('link_count', c_uint32), # References to archive files | |
('dir_offset', c_uint64), # Offset of inner file/chunk mapping section | |
] | |
def validate(self): | |
if self.magic != b'ARCH': | |
raise ValueError(f'Unrecognized magic bytes: {self.magic}') | |
return | |
class ArchiveFile: | |
def __init__(self, file_path): | |
self.fpath = file_path | |
self.fd = None | |
self.fsize = None | |
self.header = None | |
self.links = None | |
self.linked_archives = None | |
self.files = None | |
self.parse() | |
def parse(self): | |
if self.fpath is None: | |
return | |
logging.info(f'Parsing {self.fpath}') | |
self.fsize = os.path.getsize(self.fpath) | |
self.links = [] | |
self.files = {} | |
self.linked_archives = {} | |
# Initialize file descriptor | |
if self.fd is not None: | |
self.fd.close() | |
self.fd = open(self.fpath, 'rb') | |
data = self.fd.read(ArchiveHeader.sizeof()) | |
self.header = ArchiveHeader.from_bytes(data) | |
self.fd.seek(self.header.dir_offset) | |
# Parse linked archives | |
for _ in range(0, self.header.link_count): | |
s = ArchiveString.from_fd(self.fd) | |
self.links.append(s) | |
# Parse linked archives | |
logging.info(f'Linked archives: {self.links}\n') | |
full_path = os.path.abspath(self.fpath) | |
archive_dir = os.path.dirname(full_path) | |
for link in self.links: | |
linked_archive_path = os.path.abspath(os.path.join(archive_dir, str(link))) | |
if linked_archive_path == full_path: | |
self.linked_archives[str(link)] = self | |
continue | |
# Parse archives that are a dependency of this archive | |
linked_archive = self.__class__(linked_archive_path) | |
self.linked_archives[str(link)] = linked_archive | |
# Parse inner files | |
for _ in range(0, self.header.file_count): | |
inner_file = InnerFile.from_fd(self.fd) | |
assert inner_file.name != '', f'Encountered archived file at offset: {self.fd.tell():#x}' | |
assert inner_file.name not in self.files, f'Duplicate archived file: {inner_file.name}' | |
self.files[inner_file.name] = inner_file | |
logging.debug(inner_file) | |
# Check for trailing data | |
final_offset = self.fd.tell() | |
trailing_data = self.fd.read() | |
assert len(trailing_data) == 0, f'{len(trailing_data)} bytes of unknown data starting at offset {final_offset:#x}' | |
def extract(self, inner_fname, outdir=''): | |
inner_file = self.files[inner_fname] | |
extract_fpath = os.path.join(outdir, inner_file.name) | |
logging.info(f'Extracting {extract_fpath}') | |
with open(extract_fpath, 'wb') as f_out: | |
last_end = 0 | |
for chunk in inner_file.chunks: | |
# Check that chunks are contiguous and in-order | |
assert chunk.begin == last_end, f'Out-of-order chunks found but not supported. Contact the developer.' | |
last_end = chunk.begin + chunk.length | |
# Obtain handle to the archive file containing the chunk data | |
container = self.linked_archives[str(self.links[chunk.archive])] | |
# Save current file offset | |
saved_fp = container.fd.tell() | |
# Read and write data | |
container.fd.seek(chunk.offset) | |
remaining = chunk.length | |
while remaining > 0: | |
read_sz = remaining | |
if read_sz > READ_SIZE: | |
read_sz = READ_SIZE | |
data = container.fd.read(read_sz) | |
assert len(data) == read_sz, f'Expected {read_sz} bytes, received {len(data)}' | |
n_written = f_out.write(data) | |
assert n_written == read_sz, f'Expected to write {read_sz} bytes, but wrote {n_written}' | |
remaining -= read_sz | |
# Restore current file offset | |
container.fd.seek(saved_fp) | |
assert container.fd.tell() == saved_fp, f'Bad file pointer offset: {container.fd.tell():#x}' | |
return | |
def extract_all(self, outdir=''): | |
for f in self.files: | |
self.extract(f, outdir) | |
return | |
def list(self): | |
for f in self.files: | |
print(f) | |
return | |
if __name__ == '__main__': | |
if len(sys.argv) < 2: | |
print(f'Usage:\n {sys.argv[0]} <archive file> [output directory] [-v]') | |
sys.exit() | |
if '-v' in sys.argv: | |
logging.basicConfig(level=logging.DEBUG) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
OUT_DIR = '' | |
if len(sys.argv) > 2 and not sys.argv[2].startswith('-'): | |
OUT_DIR = sys.argv[2] | |
ARCHIVE_FPATH = sys.argv[1] | |
archive = ArchiveFile(ARCHIVE_FPATH) | |
if '-l' in sys.argv or '--list' in sys.argv: | |
archive.list() | |
sys.exit() | |
archive.extract_all(OUT_DIR) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import logging | |
from ctypes import Array, BigEndianStructure, LittleEndianStructure, sizeof | |
# Author: Jonathon Reinhart | |
# Source: https://gist.github.com/JonathonReinhart/b6f355f13021cd8ec5d0101e0e6675b2 | |
class StructHelper(object): | |
def __get_value_str(self, name, fmt='{}'): | |
val = getattr(self, name) | |
if isinstance(val, Array): | |
val = list(val) | |
elif isinstance(val, int): | |
return f'{val:#x}'.ljust(18) + ' (' + fmt.format(val) + ')' | |
return fmt.format(val) | |
def __str__(self): | |
result = '{}:\n'.format(self.__class__.__name__) | |
maxname = max(len(name) for name, type_, *sz_ in self._fields_) | |
for name, type_, *sz_ in self._fields_: | |
value = getattr(self, name) | |
result += ' {name:<{width}}: {value}'.format( | |
name = name, | |
width = maxname, | |
value = self.__get_value_str(name), | |
) | |
result += '\n' | |
return result | |
def __repr__(self): | |
return '{name}({fields})'.format( | |
name = self.__class__.__name__, | |
fields = ', '.join( | |
'{}={}'.format(name, self.__get_value_str(name, '{!r}')) for name, _, *sz_ in self._fields_) | |
) | |
@classmethod | |
def _typeof(cls, field): | |
"""Get the type of a field | |
Example: A._typeof(A.fld) | |
Inspired by stackoverflow.com/a/6061483 | |
""" | |
for name, type_, *sz_ in cls._fields_: | |
if getattr(cls, name) is field: | |
return type_ | |
raise KeyError | |
@classmethod | |
def read_from(cls, f): | |
result = cls() | |
if f.readinto(result) != sizeof(cls): | |
raise EOFError | |
return result | |
def get_bytes(self): | |
"""Get raw byte string of this structure | |
ctypes.Structure implements the buffer interface, so it can be used | |
directly anywhere the buffer interface is implemented. | |
https://stackoverflow.com/q/1825715 | |
""" | |
# Works for either Python 2 or Python 3 | |
return bytearray(self) | |
def validate(self): | |
"""Derived types can override this function to automatically throw errors if bad data is | |
encountered after instantiating with from_bytes | |
""" | |
return | |
@classmethod | |
def from_bytes(cls, buf): | |
inst = cls.from_buffer_copy(buf) | |
inst.validate() | |
logging.debug(inst) | |
return inst | |
@classmethod | |
def sizeof(cls): | |
return sizeof(cls) | |
class StructLE(LittleEndianStructure, StructHelper): | |
"""Little endian structure class pre-configured for the majority of use-cases | |
""" | |
_pack_ = 1 | |
class StructBE(BigEndianStructure, StructHelper): | |
"""Big endian structure class pre-configured for the majority of use-cases | |
""" | |
_pack_ = 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment