Skip to content

Instantly share code, notes, and snippets.

@ryancdotorg
Last active April 27, 2022 07:32
Show Gist options
  • Save ryancdotorg/b13be7ecf7999af46aa63759d275f56b to your computer and use it in GitHub Desktop.
Save ryancdotorg/b13be7ecf7999af46aa63759d275f56b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from sys import argv, stderr, stdout, version_info
from functools import partial
eprint = partial(print, file=stderr)
import io
import zlib
import struct
import argparse
from pathlib import Path
def _mk_is_nop():
try:
_nop = (lambda: None).__code__.co_code
def _is_nop(maybe_nop):
code = getattr(maybe_nop, '__code__', None) or getattr(maybe_nop, '__func__', None)
return getattr(code, 'co_code', None) == _nop
return _is_nop
except AttributeError:
return lambda: False
_is_nop = _mk_is_nop()
def filectx(obj, *args):
nargs = len(args)
if nargs == 0: TypeError(f'fileobj expected at least 1 argument, got {nargs}')
elif nargs == 1: filename = args[0]
elif nargs == 2: attr, filename = args
else: raise TypeError(f'fileobj expected at most 2 arguments, got {nargs}')
fileobj = partial(setattr, obj, attr)
type_ = type(filename)
if isinstance(filename, str):
fileobj(open(filename, 'rb'))
elif isinstance(filename, Path):
fileobj(filename.open('rb'))
elif isinstance(filename, (bytes, bytearray)):
fileobj(io.BytesIO(filename))
elif isinstance(filename, io.IOBase) or (hasattr(filename, 'read') and callable(filename.read)):
fileobj(filename)
# file-like objects passed in shouldn't be closed by the context manager
return None
else:
raise TypeError(f"invalid type for file: '{filename}'")
# https://filippo.io/instance-monkey-patching-in-python/
if hasattr(obj, '__exit__'):
from types import MethodType
saved_exit = getattr(obj, '__exit__')
if _is_nop(saved_exit):
def __exit__(self, *args):
self.fileobj.close()
else:
def __exit__(self, *args):
saved_exit(*args)
self.fileobj.close()
obj.__exit__ = MethodType(__exit__, obj)
class file_property:
def __init__(self, name):
from uuid import uuid4
self.name, self.symbol = name, '_' + uuid4().hex
def __get__(self, obj, cls):
if obj is None: return self
elif hasattr(obj, self.symbol): return getattr(obj, self.symbol)
else: raise AttributeError(f"'{cls.__name__}' object has no attribute '{self.name}'")
def __set__(self, obj, value):
filectx(obj, self.symbol, value)
class final_property:
def __init__(self, name):
from uuid import uuid4
self.name, self.symbol = name, '_' + uuid4().hex
def __get__(self, obj, cls):
if obj is None: return self
elif hasattr(obj, self.symbol): return getattr(obj, self.symbol)
else: raise AttributeError(f"'{cls.__name__}' object has no attribute '{self.name}'")
def __set__(self, obj, value):
if hasattr(obj, self.symbol):
raise AttributeError(f"attribute '{self.name}' of '{self.__class__.__name__}' objects is final")
setattr(obj, self.symbol, value)
class unsupported_property:
def __init__(self, name):
self.name = name
def __get__(self, obj, cls):
if obj is None: return self
raise OSError(f'{self.name} not supported')
class RollingHashed(io.IOBase):
bits = final_property('bits')
mask = final_property('mask')
callback = final_property('callback')
fileobj = file_property('fileobj')
def __init__(self, filename, callback, *, bits=12, block_size=None, callback_on_eof=False):
self.fileobj = filename
self.callback = callback
self.callback_on_eof = callback_on_eof
if block_size is None: self.block_size = 2**(bits+4)
else: self.block_size = block_size
self.bits = bits
self.mask = (1 << bits) - 1
self.hit = self.mask >> 1
# state
self.to_flush = False
self.hash = self.hit
self.buf = b''
self.pos = 0
def read(self, *args):
nargs = len(args)
if self.to_flush:
self.callback()
self.to_flush = False
if nargs == 0:
if len(self.buf) < self.block_size:
#eprint('moar input!')
self.buf += self.fileobj.read(self.block_size)
elif nargs == 1:
size = args[0]
# y tho?
if size == 0: return b''
needed = size - len(self.buf)
if needed > 0: self.buf += self.fileobj.read(needed)
else:
raise TypeError(f'read expected at most 1 argument, got {nargs}')
# end of file reachedf
if len(self.buf) == 0:
if self.callback_on_eof: self.callback()
#eprint(f'eof buf:{len(self.buf)}')
return b''
for offset, value in enumerate(self.buf):
self.hash = ((self.hash << 1) ^ value) & self.mask
if self.hash == self.hit:
self.to_flush = True
self.pos += offset
#eprint(f'hit:{offset} buf:{len(self.buf)}')
return self._shift(offset)
#eprint(f'miss buf:{len(self.buf)}')
return self._shift()
def _shift(self, *args):
nargs = len(args)
if nargs == 0:
head, tail = self.buf, b''
elif nargs == 1:
count = args[0]
head, tail = self.buf[0:count], self.buf[count:]
else: raise TypeError(f'_shift expected at most 1 argument, got {nargs}')
self.buf = tail
return head
def close(self):
if self.to_flush:
self.callback()
self.to_flush = False
self.fileobj.close()
def tell(self): return self.pos
def seekable(self): return False
def writeable(self): return False
write = unsupported_property('write')
fileno = unsupported_property('fileno')
truncate = unsupported_property('truncate')
def __enter__(self): return self
def __exit__(self, *exc): pass
# https://satisfactory.fandom.com/wiki/Save_files#Save_File_Format
class SatisfactorySaveFileReader:
fileobj = file_property('fileobj')
def __init__(self, filename):
self.fileobj = filename
self.header = self._read_header()
if self.fileobj.seekable():
size = self.fileobj.tell()
self.fileobj.seek(0)
self.raw_header = self.fileobj.read(size)
def __enter__(self): return self
def __exit__(self, *exc): pass
def tell():
return self.fileobj.tell()
@staticmethod
def _mkread(fmt):
if fmt[0] not in '@=<>!': fmt = '<' + fmt
base = fmt.lstrip('@=<>!')
size = struct.calcsize(fmt)
if len(base) == 1:
return lambda self: struct.unpack(fmt, self.fileobj.read(size))[0]
else:
return lambda self: struct.unpack(fmt, self.fileobj.read(size))
def _read_struct(self, fmt):
if fmt[0] not in '@=<>!': fmt = '<' + fmt
size = struct.calcsize(fmt)
return struct.unpack(fmt, self.fileobj.read(size))
def _read_string(self):
s = self.fileobj.read(self._read_int32())
if len(s) == 0:
return ''
elif len(s) > 0 and s[-1] != 0:
raise RuntimeError('String not null terminated')
else:
return s[0:-1].decode()
def _read_bytes(self, count=None):
if count is None: count = self._read_int32()
b = self.fileobj.read(count)
return b
def _read_array(self, fmt):
count = self._read_int32()
size = struct.calcsize(fmt)
array = []
for _ in range(count):
array.append(struct.unpack(fmt, self.fileobj.read(size)))
return array
def _read_header(self):
if self.fileobj.tell() != 0:
if not self.fileobj.seekable():
raise RuntimeError(f"Cannot seek '{self.fileobj.__class__.__name__}' object")
self.fileobj.seek(0)
d = {}
d['header_version'] = self._read_int32()
d['save_version'] = self._read_int32()
d['build_version'] = self._read_int32()
d['world_type'] = self._read_string()
d['world_properties'] = self._read_string()
d['session_name'] = self._read_string()
d['play_time'] = self._read_int32()
d['save_date'] = self._read_int64()
d['session_visibility'] = self._read_byte()
d['editor_object_version'] = self._read_int32()
d['mod_metadata'] = self._read_bytes()
d['mod_flags'] = self._read_int32()
return d
def _read_chunk(self):
d = {}
try:
d['package_file_tag'] = self._read_int64()
except struct.error:
return None
d['max_chunk_size'] = self._read_int64()
d['compressed_length'] = self._read_int64()
d['uncompressed_length'] = self._read_int64()
d['compressed_length2'] = self._read_int64()
d['uncompressed_length2'] = self._read_int64()
d['zlib'] = self._read_bytes(d['compressed_length'])
return d
@staticmethod
def serialize_chunk(compressed, *, package_file_tag=2653586369, max_chunk_size=131072):
clen = len(compressed)
ulen = len(zlib.decompress(compressed))
chunk = struct.pack(
'<qqqqqq',
package_file_tag, max_chunk_size,
clen, ulen, clen, ulen,
) + compressed
return chunk
@staticmethod
def serialize_string(s):
b = s.encode() + b'\0'
return struct.pack('<i', len(b)) + b
@classmethod
def serialize_header(cls, header):
buf = b''
buf += struct.pack('<iii',
header['header_version'], header['save_version'], header['build_version'],
)
buf += cls.serialize_string(header['world_type'])
buf += cls.serialize_string(header['world_properties'])
buf += cls.serialize_string(header['session_name'])
buf += struct.pack('<iqBi',
header['play_time'], header['save_date'],
header['session_visibility'], header['editor_object_version'],
)
buf += struct.pack('<i', len(header['mod_metadata']))
buf += header['mod_metadata']
buf += struct.pack('<i', header['mod_flags'])
return buf
# convienance
SatisfactorySaveFileReader._read_byte = SatisfactorySaveFileReader._mkread('B')
SatisfactorySaveFileReader._read_int32 = SatisfactorySaveFileReader._mkread('i')
SatisfactorySaveFileReader._read_int64 = SatisfactorySaveFileReader._mkread('q')
class Chunker:
def __init__(self, callback, *, max_chunk_size=131072, **kwargs):
self.callback = callback
if 'level' not in kwargs: kwargs['level'] = 9
self.max_chunk_size = max_chunk_size
self._kwargs = kwargs
self.reset()
def reset(self):
self.cmpr = zlib.compressobj(**self._kwargs)
self.compressed = bytearray(b'')
self.ulen = 0
def finish(self, buf=b''):
if len(buf):
self.compressed += self.cmpr.compress(buf)
self.ulen += len(buf)
if self.ulen > 0:
self.compressed += self.cmpr.flush(zlib.Z_FINISH)
#eprint('finish', self.ulen, '>', len(self.compressed))
#if len(self.compressed) > self.max_chunk_size:
# raise ValueError('wat')
self.callback(self.compressed)
self.reset()
def flush(self):
self.compressed += self.cmpr.flush(zlib.Z_FULL_FLUSH)
def add(self, buf):
size = len(buf)
if self.ulen + size > self.max_chunk_size:
if self.ulen == 0: split = self.max_chunk_size
elif True: split = 0
else: split = self.max_chunk_size - self.ulen
head, tail = buf[0:split], buf[split:]
#eprint('head/tail', len(head), len(tail))
self.finish(head)
self.add(tail)
else:
#eprint('add', self.ulen, len(buf))
self.compressed += self.cmpr.compress(buf)
self.ulen += len(buf)
def main():
import os, sys
from hashlib import sha256
recompressed = bytearray(b'')
uncompressed = bytearray(b'')
with os.fdopen(sys.stdout.fileno(), "wb", closefd=False) as stdout:
with SatisfactorySaveFileReader(argv[1]) as s:
stdout.write(s.raw_header)
eprint(s.header)
#eprint(sha256(s.raw_header).hexdigest())
#eprint(sha256(s.serialize_header(s.header)).hexdigest())
def inflate():
while True:
chunk = s._read_chunk()
if chunk is None: break
data = zlib.decompress(chunk['zlib'])
#eprint('chunk', chunk['compressed_length'], '>', chunk['uncompressed_length'], len(data))
yield data
for data in inflate():
uncompressed += data
def write_chunk(compressed):
chunk = SatisfactorySaveFileReader.serialize_chunk(compressed)
stdout.write(chunk)
c = Chunker(write_chunk, level=9)
with RollingHashed(uncompressed, c.flush, bits=13, block_size=10000000) as rh:
while True:
buf = rh.read()
c.add(buf)
if len(buf) == 0: break
c.finish()
stdout.flush()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment