Last active
April 27, 2022 07:32
-
-
Save ryancdotorg/b13be7ecf7999af46aa63759d275f56b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from sys import argv, stderr, stdout, version_info | |
from functools import partial | |
eprint = partial(print, file=stderr) | |
import io | |
import zlib | |
import struct | |
import argparse | |
from pathlib import Path | |
def _mk_is_nop(): | |
try: | |
_nop = (lambda: None).__code__.co_code | |
def _is_nop(maybe_nop): | |
code = getattr(maybe_nop, '__code__', None) or getattr(maybe_nop, '__func__', None) | |
return getattr(code, 'co_code', None) == _nop | |
return _is_nop | |
except AttributeError: | |
return lambda: False | |
_is_nop = _mk_is_nop() | |
def filectx(obj, *args): | |
nargs = len(args) | |
if nargs == 0: TypeError(f'fileobj expected at least 1 argument, got {nargs}') | |
elif nargs == 1: filename = args[0] | |
elif nargs == 2: attr, filename = args | |
else: raise TypeError(f'fileobj expected at most 2 arguments, got {nargs}') | |
fileobj = partial(setattr, obj, attr) | |
type_ = type(filename) | |
if isinstance(filename, str): | |
fileobj(open(filename, 'rb')) | |
elif isinstance(filename, Path): | |
fileobj(filename.open('rb')) | |
elif isinstance(filename, (bytes, bytearray)): | |
fileobj(io.BytesIO(filename)) | |
elif isinstance(filename, io.IOBase) or (hasattr(filename, 'read') and callable(filename.read)): | |
fileobj(filename) | |
# file-like objects passed in shouldn't be closed by the context manager | |
return None | |
else: | |
raise TypeError(f"invalid type for file: '{filename}'") | |
# https://filippo.io/instance-monkey-patching-in-python/ | |
if hasattr(obj, '__exit__'): | |
from types import MethodType | |
saved_exit = getattr(obj, '__exit__') | |
if _is_nop(saved_exit): | |
def __exit__(self, *args): | |
self.fileobj.close() | |
else: | |
def __exit__(self, *args): | |
saved_exit(*args) | |
self.fileobj.close() | |
obj.__exit__ = MethodType(__exit__, obj) | |
class file_property: | |
def __init__(self, name): | |
from uuid import uuid4 | |
self.name, self.symbol = name, '_' + uuid4().hex | |
def __get__(self, obj, cls): | |
if obj is None: return self | |
elif hasattr(obj, self.symbol): return getattr(obj, self.symbol) | |
else: raise AttributeError(f"'{cls.__name__}' object has no attribute '{self.name}'") | |
def __set__(self, obj, value): | |
filectx(obj, self.symbol, value) | |
class final_property: | |
def __init__(self, name): | |
from uuid import uuid4 | |
self.name, self.symbol = name, '_' + uuid4().hex | |
def __get__(self, obj, cls): | |
if obj is None: return self | |
elif hasattr(obj, self.symbol): return getattr(obj, self.symbol) | |
else: raise AttributeError(f"'{cls.__name__}' object has no attribute '{self.name}'") | |
def __set__(self, obj, value): | |
if hasattr(obj, self.symbol): | |
raise AttributeError(f"attribute '{self.name}' of '{self.__class__.__name__}' objects is final") | |
setattr(obj, self.symbol, value) | |
class unsupported_property: | |
def __init__(self, name): | |
self.name = name | |
def __get__(self, obj, cls): | |
if obj is None: return self | |
raise OSError(f'{self.name} not supported') | |
class RollingHashed(io.IOBase): | |
bits = final_property('bits') | |
mask = final_property('mask') | |
callback = final_property('callback') | |
fileobj = file_property('fileobj') | |
def __init__(self, filename, callback, *, bits=12, block_size=None, callback_on_eof=False): | |
self.fileobj = filename | |
self.callback = callback | |
self.callback_on_eof = callback_on_eof | |
if block_size is None: self.block_size = 2**(bits+4) | |
else: self.block_size = block_size | |
self.bits = bits | |
self.mask = (1 << bits) - 1 | |
self.hit = self.mask >> 1 | |
# state | |
self.to_flush = False | |
self.hash = self.hit | |
self.buf = b'' | |
self.pos = 0 | |
def read(self, *args): | |
nargs = len(args) | |
if self.to_flush: | |
self.callback() | |
self.to_flush = False | |
if nargs == 0: | |
if len(self.buf) < self.block_size: | |
#eprint('moar input!') | |
self.buf += self.fileobj.read(self.block_size) | |
elif nargs == 1: | |
size = args[0] | |
# y tho? | |
if size == 0: return b'' | |
needed = size - len(self.buf) | |
if needed > 0: self.buf += self.fileobj.read(needed) | |
else: | |
raise TypeError(f'read expected at most 1 argument, got {nargs}') | |
# end of file reachedf | |
if len(self.buf) == 0: | |
if self.callback_on_eof: self.callback() | |
#eprint(f'eof buf:{len(self.buf)}') | |
return b'' | |
for offset, value in enumerate(self.buf): | |
self.hash = ((self.hash << 1) ^ value) & self.mask | |
if self.hash == self.hit: | |
self.to_flush = True | |
self.pos += offset | |
#eprint(f'hit:{offset} buf:{len(self.buf)}') | |
return self._shift(offset) | |
#eprint(f'miss buf:{len(self.buf)}') | |
return self._shift() | |
def _shift(self, *args): | |
nargs = len(args) | |
if nargs == 0: | |
head, tail = self.buf, b'' | |
elif nargs == 1: | |
count = args[0] | |
head, tail = self.buf[0:count], self.buf[count:] | |
else: raise TypeError(f'_shift expected at most 1 argument, got {nargs}') | |
self.buf = tail | |
return head | |
def close(self): | |
if self.to_flush: | |
self.callback() | |
self.to_flush = False | |
self.fileobj.close() | |
def tell(self): return self.pos | |
def seekable(self): return False | |
def writeable(self): return False | |
write = unsupported_property('write') | |
fileno = unsupported_property('fileno') | |
truncate = unsupported_property('truncate') | |
def __enter__(self): return self | |
def __exit__(self, *exc): pass | |
# https://satisfactory.fandom.com/wiki/Save_files#Save_File_Format | |
class SatisfactorySaveFileReader: | |
fileobj = file_property('fileobj') | |
def __init__(self, filename): | |
self.fileobj = filename | |
self.header = self._read_header() | |
if self.fileobj.seekable(): | |
size = self.fileobj.tell() | |
self.fileobj.seek(0) | |
self.raw_header = self.fileobj.read(size) | |
def __enter__(self): return self | |
def __exit__(self, *exc): pass | |
def tell(): | |
return self.fileobj.tell() | |
@staticmethod | |
def _mkread(fmt): | |
if fmt[0] not in '@=<>!': fmt = '<' + fmt | |
base = fmt.lstrip('@=<>!') | |
size = struct.calcsize(fmt) | |
if len(base) == 1: | |
return lambda self: struct.unpack(fmt, self.fileobj.read(size))[0] | |
else: | |
return lambda self: struct.unpack(fmt, self.fileobj.read(size)) | |
def _read_struct(self, fmt): | |
if fmt[0] not in '@=<>!': fmt = '<' + fmt | |
size = struct.calcsize(fmt) | |
return struct.unpack(fmt, self.fileobj.read(size)) | |
def _read_string(self): | |
s = self.fileobj.read(self._read_int32()) | |
if len(s) == 0: | |
return '' | |
elif len(s) > 0 and s[-1] != 0: | |
raise RuntimeError('String not null terminated') | |
else: | |
return s[0:-1].decode() | |
def _read_bytes(self, count=None): | |
if count is None: count = self._read_int32() | |
b = self.fileobj.read(count) | |
return b | |
def _read_array(self, fmt): | |
count = self._read_int32() | |
size = struct.calcsize(fmt) | |
array = [] | |
for _ in range(count): | |
array.append(struct.unpack(fmt, self.fileobj.read(size))) | |
return array | |
def _read_header(self): | |
if self.fileobj.tell() != 0: | |
if not self.fileobj.seekable(): | |
raise RuntimeError(f"Cannot seek '{self.fileobj.__class__.__name__}' object") | |
self.fileobj.seek(0) | |
d = {} | |
d['header_version'] = self._read_int32() | |
d['save_version'] = self._read_int32() | |
d['build_version'] = self._read_int32() | |
d['world_type'] = self._read_string() | |
d['world_properties'] = self._read_string() | |
d['session_name'] = self._read_string() | |
d['play_time'] = self._read_int32() | |
d['save_date'] = self._read_int64() | |
d['session_visibility'] = self._read_byte() | |
d['editor_object_version'] = self._read_int32() | |
d['mod_metadata'] = self._read_bytes() | |
d['mod_flags'] = self._read_int32() | |
return d | |
def _read_chunk(self): | |
d = {} | |
try: | |
d['package_file_tag'] = self._read_int64() | |
except struct.error: | |
return None | |
d['max_chunk_size'] = self._read_int64() | |
d['compressed_length'] = self._read_int64() | |
d['uncompressed_length'] = self._read_int64() | |
d['compressed_length2'] = self._read_int64() | |
d['uncompressed_length2'] = self._read_int64() | |
d['zlib'] = self._read_bytes(d['compressed_length']) | |
return d | |
@staticmethod | |
def serialize_chunk(compressed, *, package_file_tag=2653586369, max_chunk_size=131072): | |
clen = len(compressed) | |
ulen = len(zlib.decompress(compressed)) | |
chunk = struct.pack( | |
'<qqqqqq', | |
package_file_tag, max_chunk_size, | |
clen, ulen, clen, ulen, | |
) + compressed | |
return chunk | |
@staticmethod | |
def serialize_string(s): | |
b = s.encode() + b'\0' | |
return struct.pack('<i', len(b)) + b | |
@classmethod | |
def serialize_header(cls, header): | |
buf = b'' | |
buf += struct.pack('<iii', | |
header['header_version'], header['save_version'], header['build_version'], | |
) | |
buf += cls.serialize_string(header['world_type']) | |
buf += cls.serialize_string(header['world_properties']) | |
buf += cls.serialize_string(header['session_name']) | |
buf += struct.pack('<iqBi', | |
header['play_time'], header['save_date'], | |
header['session_visibility'], header['editor_object_version'], | |
) | |
buf += struct.pack('<i', len(header['mod_metadata'])) | |
buf += header['mod_metadata'] | |
buf += struct.pack('<i', header['mod_flags']) | |
return buf | |
# convienance | |
SatisfactorySaveFileReader._read_byte = SatisfactorySaveFileReader._mkread('B') | |
SatisfactorySaveFileReader._read_int32 = SatisfactorySaveFileReader._mkread('i') | |
SatisfactorySaveFileReader._read_int64 = SatisfactorySaveFileReader._mkread('q') | |
class Chunker: | |
def __init__(self, callback, *, max_chunk_size=131072, **kwargs): | |
self.callback = callback | |
if 'level' not in kwargs: kwargs['level'] = 9 | |
self.max_chunk_size = max_chunk_size | |
self._kwargs = kwargs | |
self.reset() | |
def reset(self): | |
self.cmpr = zlib.compressobj(**self._kwargs) | |
self.compressed = bytearray(b'') | |
self.ulen = 0 | |
def finish(self, buf=b''): | |
if len(buf): | |
self.compressed += self.cmpr.compress(buf) | |
self.ulen += len(buf) | |
if self.ulen > 0: | |
self.compressed += self.cmpr.flush(zlib.Z_FINISH) | |
#eprint('finish', self.ulen, '>', len(self.compressed)) | |
#if len(self.compressed) > self.max_chunk_size: | |
# raise ValueError('wat') | |
self.callback(self.compressed) | |
self.reset() | |
def flush(self): | |
self.compressed += self.cmpr.flush(zlib.Z_FULL_FLUSH) | |
def add(self, buf): | |
size = len(buf) | |
if self.ulen + size > self.max_chunk_size: | |
if self.ulen == 0: split = self.max_chunk_size | |
elif True: split = 0 | |
else: split = self.max_chunk_size - self.ulen | |
head, tail = buf[0:split], buf[split:] | |
#eprint('head/tail', len(head), len(tail)) | |
self.finish(head) | |
self.add(tail) | |
else: | |
#eprint('add', self.ulen, len(buf)) | |
self.compressed += self.cmpr.compress(buf) | |
self.ulen += len(buf) | |
def main(): | |
import os, sys | |
from hashlib import sha256 | |
recompressed = bytearray(b'') | |
uncompressed = bytearray(b'') | |
with os.fdopen(sys.stdout.fileno(), "wb", closefd=False) as stdout: | |
with SatisfactorySaveFileReader(argv[1]) as s: | |
stdout.write(s.raw_header) | |
eprint(s.header) | |
#eprint(sha256(s.raw_header).hexdigest()) | |
#eprint(sha256(s.serialize_header(s.header)).hexdigest()) | |
def inflate(): | |
while True: | |
chunk = s._read_chunk() | |
if chunk is None: break | |
data = zlib.decompress(chunk['zlib']) | |
#eprint('chunk', chunk['compressed_length'], '>', chunk['uncompressed_length'], len(data)) | |
yield data | |
for data in inflate(): | |
uncompressed += data | |
def write_chunk(compressed): | |
chunk = SatisfactorySaveFileReader.serialize_chunk(compressed) | |
stdout.write(chunk) | |
c = Chunker(write_chunk, level=9) | |
with RollingHashed(uncompressed, c.flush, bits=13, block_size=10000000) as rh: | |
while True: | |
buf = rh.read() | |
c.add(buf) | |
if len(buf) == 0: break | |
c.finish() | |
stdout.flush() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment