Skip to content

Instantly share code, notes, and snippets.

@UserUnknownFactor
Created March 8, 2024 12:47
Show Gist options
  • Save UserUnknownFactor/309905256b183ff07d299de4833056eb to your computer and use it in GitHub Desktop.
Save UserUnknownFactor/309905256b183ff07d299de4833056eb to your computer and use it in GitHub Desktop.
Renpy raw .rpyc string extractor and repacker (useful if other tools fail to extract the strings)
#!/usr/bin/env python3
from __future__ import print_function
import sys
import os
import codecs
import errno
import random
try:
import pickle5 as pickle
except:
import pickle
if sys.version_info < (3, 8):
print('Warning: pickle5 module could not be loaded and Python version is < 3.8')
print(" newer Ren'Py games may fail to unpack!")
if sys.version_info >= (3, 5):
print(' If this occurs, fix it by installing pickle5:')
print(f' {sys.executable} -m pip install pickle5')
else:
print(' If this occurs, please upgrade to a newer Python (>= 3.5).\n')
sys.exit(1)
print()
class RenPyArchive:
archive_file = None
handle = None
files = {}
indexes = {}
version = None
pad_length = 0
key = None
verbose = False
RPA2_MAGIC = 'RPA-2.0 '
RPA3_MAGIC = 'RPA-3.0 '
RPA3_2_MAGIC = 'RPA-3.2 '
# For backward compatibility, otherwise Python3-packed archives won't be read by Python2
PICKLE_PROTOCOL = 2
def __init__(self, file_name = None, version = 3, pad_length = 0, key = 0xDEADBEEF, verbose = False):
self.pad_length = pad_length
self.key = key
self.verbose = verbose
if file_name is not None:
self.load(file_name)
else:
self.version = version
def __del__(self):
self.close_handle()
def close_handle(self):
if self.handle is not None:
self.handle.close()
self.handle = None
self.files = {}
self.version = None
self.indexes = None
# Determine archive version.
def get_version(self):
self.handle.seek(0)
try:
magic = self.handle.readline()
magic = magic.decode('utf-8')
except UnicodeDecodeError:
print(f"Invalid RenPy archive header: {magic[:62]}")
sys.exit(1)
if magic.startswith(self.RPA3_2_MAGIC):
return 3.2
elif magic.startswith(self.RPA3_MAGIC):
return 3
elif magic.startswith(self.RPA2_MAGIC):
return 2
elif self.archive_file.endswith('.rpi'):
return 1
raise ValueError("the given file is not a valid Ren'Py archive, or an unsupported version")
# Extract file indexes from opened archive.
def extract_indexes(self):
self.handle.seek(0)
indexes = None
if self.version in [2, 3, 3.2]:
# Fetch metadata.
metadata = self.handle.readline()
vals = metadata.split()
offset = int(vals[1], 16)
if self.version == 3:
self.key = 0
for subkey in vals[2:]:
self.key ^= int(subkey, 16)
elif self.version == 3.2:
self.key = 0
for subkey in vals[3:]:
self.key ^= int(subkey, 16)
# Load in indexes.
self.handle.seek(offset)
contents = codecs.decode(self.handle.read(), 'zlib')
indexes = pickle.loads(contents, encoding='latin1')
# Deobfuscate indexes.
if self.version in [3, 3.2]:
obfuscated_indexes = indexes
indexes = {}
for i in obfuscated_indexes.keys():
if len(obfuscated_indexes[i][0]) == 2:
indexes[i] = [ (offset ^ self.key, length ^ self.key) for offset, length in obfuscated_indexes[i] ]
else:
indexes[i] = [ (offset ^ self.key, length ^ self.key, prefix) for offset, length, prefix in obfuscated_indexes[i] ]
else:
indexes = pickle.loads(codecs.decode(self.handle.read(), 'zlib'))
return indexes
# Generate pseudorandom padding (for whatever reason).
def generate_padding(self):
length = random.randint(1, self.pad_length)
padding = ''
while length > 0:
padding += chr(random.randint(1, 255))
length -= 1
return bytes(padding, 'utf-8')
# Converts a filename to archive format.
def convert_filename(self, filename):
(drive, filename) = os.path.splitdrive(os.path.normpath(filename).replace(os.sep, '/'))
return filename
# Debug (verbose) messages.
def verbose_print(self, message):
if self.verbose:
print(message)
# List files in archive and current internal storage.
def list(self):
return list(self.indexes.keys()) + list(self.files.keys())
# Check if a file exists in the archive.
def has_file(self, filename):
filename = filename
return filename in self.indexes.keys() or filename in self.files.keys()
# Read file from archive or internal storage.
def read(self, filename):
filename = self.convert_filename(filename)
# Check if the file exists in our indexes.
if filename not in self.files and filename not in self.indexes:
raise IOError(errno.ENOENT, 'the requested file {0} does not exist in the given Ren\'Py archive'.format(
filename))
# If it's in our opened archive index, and our archive handle isn't valid, something is obviously wrong.
if filename not in self.files and filename in self.indexes and self.handle is None:
raise IOError(errno.ENOENT, 'the requested file {0} does not exist in the given Ren\'Py archive'.format(
filename))
# Check our simplified internal indexes first, in case someone wants to read a file they added before without saving, for some unholy reason.
if filename in self.files:
self.verbose_print('Reading file {0} from internal storage...'.format(filename))
return self.files[filename]
# We need to read the file from our open archive.
else:
# Read offset and length, seek to the offset and read the file contents.
if len(self.indexes[filename][0]) == 3:
(offset, length, prefix) = self.indexes[filename][0]
else:
(offset, length) = self.indexes[filename][0]
prefix = b''
self.verbose_print(f'Reading file {filename} from data file {self.archive_file}... (offset = {offset}, length = {length} bytes)')
self.handle.seek(offset)
def _unmangle(data):
return data if type(data) == bytes else data.encode('latin1')
return _unmangle(prefix) + self.handle.read(length - len(prefix))
# Modify a file in archive or internal storage.
def change(self, filename, contents):
filename = filename
# Our 'change' is basically removing the file from our indexes first, and then re-adding it.
self.remove(filename)
self.add(filename, contents)
# Add a file to the internal storage.
def add(self, filename, contents):
filename = self.convert_filename(filename)
if filename in self.files or filename in self.indexes:
print(f'file {filename} already exists in archive')
return
self.verbose_print(f'Adding file {filename} to archive... (length = {len(contents)} bytes)')
self.files[filename] = contents
# Remove a file from archive or internal storage.
def remove(self, filename):
filename = filename
if filename in self.files:
self.verbose_print(f'Removing file {filename} from internal storage...')
del self.files[filename]
elif filename in self.indexes:
self.verbose_print(f'Removing file {filename} from archive indexes...')
del self.indexes[filename]
else:
print(f'the requested file {filename} does not exist in this archive')
# Load archive.
def load(self, filename):
self.close_handle()
self.archive_file = filename
try:
self.handle = open(self.archive_file, 'rb')
self.version = self.get_version()
self.indexes = self.extract_indexes()
except Exception as e:
print(f"can't load archive {filename}: {str(e)}")
# Save current state into a new file, merging archive and internal storage, rebuilding indexes, and optionally saving in another format version.
def save(self, filename = None):
if filename is None:
filename = self.archive_file
if filename is None:
raise ValueError('no target file found for saving archive')
if self.version != 2 and self.version != 3:
raise ValueError('saving is only supported for version 2 and 3 archives')
self.verbose_print('Rebuilding archive index...')
# Fill our own files structure with the files added or changed in this session.
files = self.files
# First, read files from the current archive into our files structure.
for file in list(self.indexes.keys()):
content = self.read(file)
# Remove from indexes array once read, add to our own array.
del self.indexes[file]
files[file] = content
# Predict header length, we'll write that one last.
offset = 0
if self.version == 3:
offset = 34
elif self.version == 2:
offset = 25
archive = open(filename, 'wb')
archive.seek(offset)
# Build our own indexes while writing files to the archive.
indexes = {}
self.verbose_print('Writing files to archive file...')
for file, content in files.items():
# Generate random padding, for whatever reason.
if self.pad_length > 0:
padding = self.generate_padding()
archive.write(padding)
offset += len(padding)
archive.write(content)
# Update index.
if self.version == 3:
indexes[file] = [ (offset ^ self.key, len(content) ^ self.key) ]
elif self.version == 2:
indexes[file] = [ (offset, len(content)) ]
offset += len(content)
# Write the indexes.
self.verbose_print('Writing archive index to archive file...')
archive.write(codecs.encode(pickle.dumps(indexes, self.PICKLE_PROTOCOL), 'zlib'))
# Now write the header.
self.verbose_print(f'Writing header to archive file... (version = RPAv{self.version})')
archive.seek(0)
if self.version == 3:
archive.write(codecs.encode('{}{:016x} {:08x}\n'.format(self.RPA3_MAGIC, offset, self.key)))
else:
archive.write(codecs.encode('{}{:016x}\n'.format(self.RPA2_MAGIC, offset)))
# We're done, close it.
archive.close()
# Reload the file in our inner database.
self.load(filename)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description='A tool for working with Ren\'Py archive files.',
epilog='The FILE argument can optionally be in ARCHIVE=REAL format, mapping a file in the archive file system to a file on your real file system. An example of this: rpatool -x test.rpa script.rpyc=/home/foo/test.rpyc',
add_help=False)
parser.add_argument('archive', metavar='ARCHIVE', help='The Ren\'py archive file to operate on.')
parser.add_argument('files', metavar='FILE', nargs='*', action='append', help='Zero or more files to operate on.')
parser.add_argument('-l', '--list', action='store_true', help='List files in archive ARCHIVE.')
parser.add_argument('-x', '--extract', action='store_true', help='Extract FILEs from ARCHIVE.')
parser.add_argument('-c', '--create', action='store_true', help='Creative ARCHIVE from FILEs.')
parser.add_argument('-d', '--delete', action='store_true', help='Delete FILEs from ARCHIVE.')
parser.add_argument('-a', '--append', action='store_true', help='Append FILEs to ARCHIVE.')
parser.add_argument('-2', '--two', action='store_true', help='Use the RPAv2 format for creating/appending to archives.')
parser.add_argument('-3', '--three', action='store_true', help='Use the RPAv3 format for creating/appending to archives (default).')
parser.add_argument('-k', '--key', metavar='KEY', help='The obfuscation key used for creating RPAv3 archives, in hexadecimal (default: 0xDEADBEEF).')
parser.add_argument('-p', '--padding', metavar='COUNT', help='The maximum number of bytes of padding to add between files (default: 0).')
parser.add_argument('-o', '--outfile', help='An alternative output archive file when appending to or deleting from archives, or output directory when extracting.')
parser.add_argument('-h', '--help', action='help', help='Print this help and exit.')
parser.add_argument('-v', '--verbose', action='store_true', help='Be a bit more verbose while performing operations.')
parser.add_argument('-V', '--version', action='version', version='rpatool v0.8', help='Show version information.')
arguments = parser.parse_args()
# Determine RPA version.
if arguments.two:
version = 2
else:
version = 3
# Determine RPAv3 key.
if 'key' in arguments and arguments.key is not None:
key = int(arguments.key, 16)
else:
key = 0xDEADBEEF
# Determine padding bytes.
if 'padding' in arguments and arguments.padding is not None:
padding = int(arguments.padding)
else:
padding = 0
# Determine output file/directory and input archive
if arguments.create:
archive = None
output = arguments.archive
else:
archive = arguments.archive
if 'outfile' in arguments and arguments.outfile is not None:
output = arguments.outfile
else:
# Default output directory for extraction is the current directory.
if arguments.extract:
output = '.'
else:
output = arguments.archive
# Normalize files.
if len(arguments.files) > 0 and isinstance(arguments.files[0], list):
arguments.files = arguments.files[0]
try:
archive = RenPyArchive(archive, pad_length=padding, key=key, version=version, verbose=arguments.verbose)
except IOError as e:
print('Could not open archive file {0} for reading: {1}'.format(archive, e), file=sys.stderr)
sys.exit(1)
if arguments.create or arguments.append:
# We need this seperate function to recursively process directories.
def add_file(filename):
# If the archive path differs from the actual file path, as given in the argument,
# extract the archive path and actual file path.
if filename.find('=') != -1:
(outfile, filename) = filename.split('=', 2)
else:
outfile = filename
if os.path.isdir(filename):
for file in os.listdir(filename):
# We need to do this in order to maintain a possible ARCHIVE=REAL mapping between directories.
add_file(outfile + os.sep + file + '=' + filename + os.sep + file)
else:
try:
with open(filename, 'rb') as file:
archive.add(outfile, file.read())
except Exception as e:
print('Could not add file {0} to archive: {1}'.format(filename, e), file=sys.stderr)
# Iterate over the given files to add to archive.
for filename in arguments.files:
add_file(filename)
# Set version for saving, and save.
archive.version = version
try:
archive.save(output)
except Exception as e:
print('Could not save archive file: {0}'.format(e), file=sys.stderr)
elif arguments.delete:
# Iterate over the given files to delete from the archive.
for filename in arguments.files:
try:
archive.remove(filename)
except Exception as e:
print('Could not delete file {0} from archive: {1}'.format(filename, e), file=sys.stderr)
# Set version for saving, and save.
archive.version = version
try:
archive.save(output)
except Exception as e:
print('Could not save archive file: {0}'.format(e), file=sys.stderr)
elif arguments.extract:
# Either extract the given files, or all files if no files are given.
if len(arguments.files) > 0:
files = arguments.files
else:
files = archive.list()
# Create output directory if not present.
if not os.path.exists(output):
os.makedirs(output)
# Iterate over files to extract.
for filename in files:
if filename.find('=') != -1:
(outfile, filename) = filename.split('=', 2)
else:
outfile = filename
try:
contents = archive.read(filename)
# Create output directory for file if not present.
if not os.path.exists(os.path.dirname(os.path.join(output, outfile))):
os.makedirs(os.path.dirname(os.path.join(output, outfile)))
with open(os.path.join(output, outfile), 'wb') as file:
file.write(contents)
except Exception as e:
print(f'Could not extract file {filename} from archive: {str(e)}', file=sys.stderr)
elif arguments.list:
# Print the sorted file list.
list = archive.list()
list.sort()
for file in list:
print(file)
else:
print('No operation given :(')
print(f'Use {sys.argv[0]} --help for usage details.')
from pkg_resources import parse_version
import kaitaistruct
from kaitaistruct import KaitaiStruct, KaitaiStream, BytesIO
from enum import IntEnum
import zlib, struct
from os import SEEK_SET, SEEK_CUR, SEEK_END
DEF_OUT_DIR = 'translation_out'
if parse_version(kaitaistruct.__version__) < parse_version("0.10"):
raise Exception("Incompatible Kaitai Struct Python API: 0.10 or later is required, but you have %s" % (kaitaistruct.__version__))
def replace_str_bytes(buffer: bytes, offset: int, slen: int, *args):
""" Example: replace_str_bytes(b, 1, "10c", b'1234567890') """
buffer = bytearray(buffer)
struct.pack_into(f"{slen}c", buffer, offset, *args)
return bytes(buffer)
class PythonPickle(KaitaiStruct):
"""Python Pickle format serializes Python objects to a byte stream, as a sequence
of operations to run on the Pickle Virtual Machine.
The format is mostly implementation defined, there is no formal specification.
Pickle data types are closely coupled to the Python object model.
Python singletons, and most builtin types (e.g. `None`, `int`,`dict`, `list`)
are serialised using dedicated Pickle opcodes.
Other builtin types, and all classes (e.g. `set`, `datetime.datetime`) are
serialised by encoding the name of a constructor callable.
They are deserialised by importing that constructor, and calling it.
So, unpickling an arbitrary pickle, using the Python's stdlib pickle module
can cause arbitrary code execution.
Pickle format has evolved with Python, later protocols add opcodes & types.
Later Python releases can pickle to or unpickle from any earlier protocol.
* Protocol 0: ASCII clean, no explicit version, fields are '\n' terminated.
* Protocol 1: Binary, no explicit version, first length prefixed types.
* Protocol 2 ([PEP 307](https://peps.python.org/pep-0307/)): Python 2.3+.
Explicit versioning, more length prefixed types.
* Protocol 3: Python 3.0+. Dedicated opcodes for `bytes` objects.
* Protocol 4 ([PEP 3154](https://peps.python.org/pep-3154/)): Python 3.4+.
Opcodes for 64 bit strings, framing, `set`.
* Protocol 5 ([PEP 574](https://peps.python.org/pep-0574/)): Python 3.8+:
Opcodes for `bytearray` and out of band data
.. See also::
Source - https://github.com/python/cpython/blob/v3.8.1/Lib/pickletools.py
"""
class Opcode(IntEnum):
mark = 0x28
empty_tuple = 0x29
stop = 0x2E
pop = 0x30
pop_mark = 0x31
dup = 0x32
binbytes = 0x42
short_binbytes = 0x43
float = 0x46
binfloat = 0x47
int = 0x49
binint = 0x4A
binint1 = 0x4B
long = 0x4C
binint2 = 0x4D
none = 0x4E
persid = 0x50
binpersid = 0x51
reduce = 0x52
string = 0x53
binstring = 0x54
short_binstring = 0x55
unicode = 0x56
binunicode = 0x58
empty_list = 0x5D
append = 0x61
build = 0x62
global_opcode = 0x63
dict = 0x64
appends = 0x65
get = 0x67
binget = 0x68
inst = 0x69
long_binget = 0x6A
list = 0x6C
obj = 0x6F
put = 0x70
binput = 0x71
long_binput = 0x72
setitem = 0x73
tuple = 0x74
setitems = 0x75
empty_dict = 0x7D
proto = 0x80
newobj = 0x81
ext1 = 0x82
ext2 = 0x83
ext4 = 0x84
tuple1 = 0x85
tuple2 = 0x86
tuple3 = 0x87
newtrue = 0x88
newfalse = 0x89
long1 = 0x8A
long4 = 0x8B
short_binunicode = 0x8C
binunicode8 = 0x8D
binbytes8 = 0x8E
empty_set = 0x8F
additems = 0x90
frozenset = 0x91
newobj_ex = 0x92
stack_global = 0x93
memoize = 0x94
frame = 0x95
bytearray8 = 0x96
next_buffer = 0x97
readonly_buffer = 0x98
FORMAT_MAGIC = b"RENPY RPC2"
def __init__(self, _io=None, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
self._io.seek(0)
self.magic = self._io.read_bytes(0x0A)
if not self.magic == self.FORMAT_MAGIC:
raise kaitaistruct.ValidationNotEqualError(self.FORMAT_MAGIC, self.magic, self._io, u"/seq/0")
self.data = [PythonPickle.HeaderData(self._root._io, self, self._root, False) for _ in range(3)]
for i, _ in enumerate(self.data):
self.data[i]._read()
self._header_end = self._io.pos()
self.opcodes = self.get_opcodes()
def _write(self, stream=None):
OP = PythonPickle.Opcode
_pos = 0
io = self._root._io
if stream is None:
_pos = io.pos()
stream = io
io.seek(self._header_end)
io._io.truncate()
else:
io.seek(0)
header = io.read_bytes(self._header_end)
stream.write_bytes(header)
opcs_stream = KaitaiStream(BytesIO())
[_op._write(opcs_stream) for _op in self.opcodes]
opcs_stream.seek(0)
_raw_opcodes = opcs_stream.read_bytes_full()
#with open("outcodes.bin", "wb") as f: f.write(_raw_opcodes)
self.data[1].data = zlib.compress(_raw_opcodes, 6) # NOTE: that's the same as how RenPy does it
for d in self.data:
d._write(stream)
stream.seek(_pos)
def update_opcode_strings(self, strings):
OP = PythonPickle.Opcode
i = 0
opcodes_len = len(self.opcodes)
strings_len = len(strings)
for oi, _on in enumerate(self.opcodes):
if _on._is_string:
separator = '\r\n' if '\r\n' in self.opcodes[oi].arg.val else '\n'
old_lines = []
for _ in range(len(self.opcodes[oi].arg.val.split(separator))):
assert i < strings_len, "String sequence and count must match that of the original .rpyc file"
old_lines.append(strings[i])
i += 1
assert oi < opcodes_len, "String sequence and count must match that of the original .rpyc file"
self.opcodes[oi].arg.val = separator.join(old_lines)
def get_opcodes(self):
io = self._root._io
_pos = io.pos()
OP = PythonPickle.Opcode
self._raw_opcodes = zlib.decompress(self.data[1].data)
#with open("incodes.bin", "wb") as f: f.write(self._raw_opcodes)
ks_opcodes = KaitaiStream(BytesIO(self._raw_opcodes))
self.opcodes = []
self.strings = []
_on = PythonPickle.Op(ks_opcodes, self, self._root, self._init)
while _on:
self.opcodes.append(_on)
if _on.code == OP.stop:
break
if _on._is_string:
self.strings.append(_on.arg.val)
_on = PythonPickle.Op(ks_opcodes, self, self._root, self._init)
io.seek(_pos)
return self.opcodes
class Unicodestring8(KaitaiStruct):
"""Length prefixed utf-8 string, between 0 and 2e64-1 bytes long.
Only a 64-bit build of Python would produce a pickle containing strings large enough
to need this type. Such a pickle could not be unpickled on a 32-bit build of Python,
because the string would be larger than `sys.maxsize`.
"""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data_len = self._io.read_u8le()
data = self._io.read_bytes(data_len)
self.val = data.decode("utf-8")
self.size = 8 + len(data)
def _write(self, stream: KaitaiStream):
data = self.val.encode("utf-8")
stream.write_u8le(len(data))
stream.write_bytes(data)
self.size = 8 + len(data)
class Long1(KaitaiStruct):
"""Large signed integer, in the range -2e(8*255-1) to 2e(8*255-1)-1,
encoded as two's complement.
"""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data_len = self._io.read_u1()
self.val = self._io.read_bytes(data_len)
self.size =1 + len(self.val)
def _write(self, stream: KaitaiStream):
stream.write_u1(len(self.val))
stream.write_bytes(self.val)
self.size = 1 + len(self.val)
class Bytes8(KaitaiStruct):
"""Length prefixed string, between 0 and 2e64-1 bytes long.
Only a 64-bit build of Python would produce a pickle containing strings
large enough to need this type. Such a pickle could not be unpickled on
a 32-bit build of Python, because the string would be larger than
`sys.maxsize`.
"""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data_len = self._io.read_u8le()
self.val = self._io.read_bytes(data_len)
self.size = 8 + len(self.val)
def _write(self, stream: KaitaiStream):
stream.write_u8le(len(self.val))
stream.write_bytes(self.val)
self.size = 8 + len(self.val)
class Bytes1(KaitaiStruct):
"""Length prefixed byte string, between 0 and 255 bytes long."""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data_len = self._io.read_u1()
self.val = self._io.read_bytes(data_len)
self.size = 1 + len(self.val)
def _write(self, stream: KaitaiStream):
stream.write_u1(len(self.val))
stream.write_bytes(self.val)
self.size = 1 + len(self.val)
class Bytes4(KaitaiStruct):
"""Length prefixed bytes, between 0 and 2e32-1 bytes long."""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data_len = self._io.read_u4le()
self.val = self._io.read_bytes(data_len)
self.size = 4 + len(self.val)
def _write(self, stream: KaitaiStream):
stream.write_u4le(len(self.val))
stream.write_bytes(self.val)
self.size = 4 + len(self.val)
class NoArg(KaitaiStruct):
"""Some opcodes take no argument, this empty type is used for them."""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
self.size = 0
def _read(self):
pass
def _write(self, stream: KaitaiStream):
pass
class StringnlNoescape(KaitaiStruct):
"""Unquoted string, does not contain string escapes."""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data = self._io.read_bytes_term(0x0A, False, True, True)
self.val = data.decode("ascii")
self.size = len(data)
def _write(self, stream: KaitaiStream):
data = self.val.encode("ascii")
stream.write_bytes(data)
stream.write_bytes(b'\x0A')
self.size = 1 + len(data)
class DecimalnlLong(KaitaiStruct):
"""Integer, encoded with the ASCII chracters [0-9-], followed by 'L'."""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data = self._io.read_bytes_term(0x0A, False, True, True)
self.val = data.decode("ascii")
self.size = len(data)
def _write(self, stream: KaitaiStream):
data = self.val.encode("ascii")
stream.write_bytes(data)
stream.write_bytes(b'\x0A')
self.size = 1 + len(data)
class Unicodestring4(KaitaiStruct):
"""Length prefixed string, between 0 and 2e32-1 bytes long."""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data_len = self._io.read_u4le()
assert data_len < 10000, "Unicodestring4 is impossibly long"
data = self._io.read_bytes(data_len)
self.val = data.decode("utf-8")
self.size = 4 + len(data)
def _write(self, stream: KaitaiStream):
data = self.val.encode("utf-8")
stream.write_u4le(len(data))
stream.write_bytes(data)
self.size = 4 + len(data)
class Unicodestringnl(KaitaiStruct):
"""Unquoted string, containing Python Unicode escapes."""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data = self._io.read_bytes_term(0x0A, False, True, True)
self.val = data.decode("ascii")
self.size = len(data)
def _write(self, stream: KaitaiStream):
data = self.val.encode("ascii")
stream.write_bytes(data)
stream.write_bytes(b'\x0A')
self.size = 1 + len(data)
class Long4(KaitaiStruct):
"""Large signed integer, in the range -2e(8*2e32-1) to 2e(8*2e32-1)-1,
encoded as two's complement.
"""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data_len = self._io.read_u4le()
self.val = self._io.read_bytes(data_len)
self.size = 4 + len(self.val)
def _write(self, stream: KaitaiStream):
stream.write_u4le(len(self.val))
stream.write_bytes(self.val)
self.size = 4 + len(self.val)
class String1(KaitaiStruct):
"""Length prefixed string, between 0 and 255 bytes long. Encoding is
unspecified.
The default Python 2.x string type (`str`) is a sequence of bytes.
These are pickled as `string1` or `string4`, when protocol == 2.
The bytes are written directly, no explicit encoding is performed.
Python 3.x will not pickle an object as `string1` or `string4`.
Instead, opcodes and types with a known encoding are used.
When unpickling
- `pickle.Unpickler` objects default to ASCII, which can be overriden
- `pickletools.dis` uses latin1, and cannot be overriden
.. See also::
Source - https://github.com/python/cpython/blob/bb8071a4cae/Lib/pickle.py#L486-L495
"""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data_len = self._io.read_u1()
data = self._io.read_bytes(data_len)
self.val = data#.decode("latin1")
self.size = 1 + len(self.val)
def _write(self, stream: KaitaiStream):
data = self.val#.encode("latin1")
stream.write_u1(len(data))
stream.write_bytes(data)
self.size = 1 + len(data)
class Bytearray8(KaitaiStruct):
"""Length prefixed bytearray, between 0 and 2e64-1 bytes long.
The contents are deserilised into a `bytearray` object.
"""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data_len = self._io.read_u8le()
self.val = self._io.read_bytes(data_len)
self.size = 8 + len(self.val)
def _write(self, stream: KaitaiStream):
stream.write_u8le(len(self.val))
stream.write_bytes(self.val)
self.size = 8 + len(self.val)
class DecimalnlShort(KaitaiStruct):
"""Integer or boolean, encoded with the ASCII characters [0-9-].
The values '00' and '01' encode the Python values `False` and `True`.
Normally a value would not contain leading '0' characters.
"""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data = self._io.read_bytes_term(0x0A, False, True, True)
self.val = data.decode("ascii")
self.size = 1 + len(self.val)
def _write(self, stream: KaitaiStream):
data = self.val.encode("ascii")
stream.write_bytes(data)
stream.write_bytes(b'\x0A')
self.size = 1 + len(data)
class Unicodestring1(KaitaiStruct):
"""Length prefixed utf-8 string, between 0 and 255 bytes long."""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data_len = self._io.read_u1()
data = self._io.read_bytes(data_len)
self.val = data.decode("utf-8")
self.size = 1 + len(data)
def _write(self, stream: KaitaiStream):
data = self.val.encode("utf-8")
stream.write_u1(len(data))
stream.write_bytes(data)
self.size = 1 + len(data)
class Stringnl(KaitaiStruct):
"""Quoted string, possibly containing Python string escapes."""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data = self._io.read_bytes_term(0x0A, False, True, True)
self.val = data.decode("ascii")
self.size = 1 + len(data)
def _write(self, stream: KaitaiStream):
data = self.val.encode("ascii")
stream.write_bytes(data)
stream.write_bytes(b'\x0A')
self.size = 1 + len(data)
class StringnlNoescapePair(KaitaiStruct):
"""Pair of unquoted, unescaped strings."""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
self.val1 = PythonPickle.StringnlNoescape(self._io, self, self._root, self._init)
self.val2 = PythonPickle.StringnlNoescape(self._io, self, self._root, self._init)
self.size = self.val1.size + self.val2.size
def _write(self, stream: KaitaiStream):
self.val1._write(stream)
self.val2._write(stream)
self.size = self.val1.size + self.val2.size
class String4(KaitaiStruct):
"""Length prefixed string, between 0 and 2e31-1 bytes long. Encoding is unspecified.
Although the len field is signed, any length < 0 will raise an exception during unpickling.
See the documentation for `string1` for further detail about encodings.
.. See also::
Source - https://github.com/python/cpython/blob/bb8071a4cae/Lib/pickle.py#L486-L495
"""
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data_len = self._io.read_s4le()
self.val = self._io.read_bytes(data_len)
self.size = 4 + len(self.val)
def _write(self, stream: KaitaiStream):
stream.write_s4le(len(self.val))
stream.write_bytes(self.val)
self.size = 4 + len(self.val)
class Floatnl(KaitaiStruct):
"""Double float, encoded with the ASCII characters [0-9.e+-], '-inf', 'inf', or 'nan'. """
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
data = self._io.read_bytes_term(0x0A, False, True, True)
self.val = data.decode("ascii")
self.size = 4 + len(data)
def _write(self, stream: KaitaiStream):
data = self.val.encode("ascii")
stream.write_bytes(data)
stream.write_bytes(b'\x0A')
self.size = 1 + len(data)
class HeaderData(KaitaiStruct):
__slots__ = ["slot", "start", "length"]
def __init__(self, _io=None, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
if self._init:
self._read()
def _read(self):
io = self._root._io
self._pos = io.pos()
self.slot = io.read_u4le()
self.start = io.read_u4le()
self.length = io.read_u4le()
io.seek(self.start)
self.data = io.read_bytes(self.length)
io.seek(self._pos + 4 * 3)
def _write(self, stream=None):
if stream is None:
stream = self._root._io
stream.seek(0, SEEK_END)
if len(self.data):
self.start = stream.pos()
self.length = stream.write_bytes(self.data)
stream.seek(self._pos)
stream.write_u4le(self.slot)
stream.write_u4le(self.start)
stream.write_u4le(self.length)
class Op(KaitaiStruct):
""" Opcodes processing class """
def __init__(self, _io, _parent=None, _root=None, _init=False):
super().__init__(_io, _parent, _root, _init)
self._is_string = False
if self._init:
self._read()
def __str__(self):
return f"Op<{KaitaiStream.resolve_enum(PythonPickle.Opcode, self._raw_code).name}>"
def _read(self):
self._raw_code = self._io.read_u1()
OP = PythonPickle.Opcode
self.code = KaitaiStream.resolve_enum(OP, self._raw_code)
_on = self.code
self.size = 0
self.arg = PythonPickle.NoArg(self._io, self, self._root, self._init)
if _on == OP.ext4:
self.arg = self._io.read_u4le()
self.size = 4
elif _on == OP.tuple1:
pass
elif _on == OP.setitem:
pass
elif _on == OP.readonly_buffer:
pass
elif _on == OP.stop:
pass
elif _on == OP.ext2:
self.arg = self._io.read_u2le()
self.size = 2
elif _on == OP.empty_tuple:
pass
elif _on == OP.newtrue:
pass
elif _on == OP.long:
self.arg = PythonPickle.DecimalnlLong(self._io, self, self._root, self._init)
elif _on == OP.newobj:
pass
elif _on == OP.bytearray8:
self.arg = PythonPickle.Bytearray8(self._io, self, self._root, self._init)
elif _on == OP.put:
self.arg = PythonPickle.DecimalnlShort(self._io, self, self._root, self._init)
elif _on == OP.stack_global:
pass
elif _on == OP.pop_mark:
pass
elif _on == OP.append:
pass
elif _on == OP.newfalse:
pass
elif _on == OP.binpersid:
pass
elif _on == OP.build:
pass
elif _on == OP.empty_dict:
pass
elif _on == OP.tuple2:
pass
elif _on == OP.long4:
self.arg = PythonPickle.Long4(self._io, self, self._root, self._init)
elif _on == OP.next_buffer:
pass
elif _on == OP.appends:
pass
elif _on == OP.binbytes:
self.arg = PythonPickle.Bytes4(self._io, self, self._root, self._init)
elif _on == OP.dup:
pass
elif _on == OP.list:
pass
elif _on == OP.proto:
self.arg = self._io.read_u1()
self.size = 1
elif _on == OP.pop:
pass
elif _on == OP.frame:
self.arg = self._io.read_u8le()
self.size = 8
elif _on == OP.string:
self.arg = PythonPickle.Stringnl(self._io, self, self._root, self._init)
self._is_string = True
elif _on == OP.binunicode:
self.arg = PythonPickle.Unicodestring4(self._io, self, self._root, self._init)
self._is_string = True
elif _on == OP.float:
self.arg = PythonPickle.Floatnl(self._io, self, self._root, self._init)
elif _on == OP.reduce:
pass
elif _on == OP.global_opcode:
self.arg = PythonPickle.StringnlNoescapePair(self._io, self, self._root, self._init)
elif _on == OP.binput:
self.arg = self._io.read_u1()
self.size = 1
elif _on == OP.memoize:
pass
elif _on == OP.persid:
self.arg = PythonPickle.StringnlNoescape(self._io, self, self._root, self._init)
elif _on == OP.ext1:
self.arg = self._io.read_u1()
self.size = 1
elif _on == OP.none:
pass
elif _on == OP.short_binunicode:
self.arg = PythonPickle.Unicodestring1(self._io, self, self._root, self._init)
self._is_string = True
elif _on == OP.obj:
pass
elif _on == OP.binfloat:
self.arg = self._io.read_f8be()
self.size = 8
elif _on == OP.newobj_ex:
pass
elif _on == OP.empty_list:
pass
elif _on == OP.tuple:
pass
elif _on == OP.binunicode8:
self.arg = PythonPickle.Unicodestring8(self._io, self, self._root, self._init)
self._is_string = True
elif _on == OP.binget:
self.arg = self._io.read_u1()
self.size = 1
elif _on == OP.dict:
pass
elif _on == OP.binstring:
self.arg = PythonPickle.String4(self._io, self, self._root, self._init)
self._is_string = True
elif _on == OP.setitems:
pass
elif _on == OP.binint2:
self.arg = self._io.read_u2le()
self.size = 2
elif _on == OP.binbytes8:
self.arg = PythonPickle.Bytes8(self._io, self, self._root, self._init)
elif _on == OP.binint1:
self.arg = self._io.read_u1()
self.size = 1
elif _on == OP.inst:
self.arg = PythonPickle.StringnlNoescapePair(self._io, self, self._root, self._init)
elif _on == OP.long_binget:
self.arg = self._io.read_u4le()
self.size = 4
elif _on == OP.long_binput:
self.arg = self._io.read_u4le()
self.size = 4
elif _on == OP.int:
self.arg = PythonPickle.DecimalnlShort(self._io, self, self._root, self._init)
elif _on == OP.binint:
self.arg = self._io.read_s4le()
self.size = 4
elif _on == OP.unicode:
self.arg = PythonPickle.Unicodestringnl(self._io, self, self._root, self._init)
self._is_string = True
elif _on == OP.long1:
self.arg = PythonPickle.Long1(self._io, self, self._root, self._init)
elif _on == OP.short_binstring:
self.arg = PythonPickle.String1(self._io, self, self._root, self._init)
#self._is_string = True
elif _on == OP.mark:
pass
elif _on == OP.frozenset:
pass
elif _on == OP.tuple3:
pass
elif _on == OP.additems:
pass
elif _on == OP.get:
self.arg = PythonPickle.DecimalnlShort(self._io, self, self._root, self._init)
elif _on == OP.empty_set:
pass
elif _on == OP.short_binbytes:
self.arg = PythonPickle.Bytes1(self._io, self, self._root, self._init)
else:
raise Exception(f"Unknown opcode {self._raw_code}")
def _write(self, stream: KaitaiStream):
stream.write_u1(self._raw_code)
OP = PythonPickle.Opcode
self.code = KaitaiStream.resolve_enum(OP, self._raw_code)
_on = self.code
if _on == OP.ext4:
stream.write_u4le(self.arg)
elif _on == OP.ext2:
stream.write_u2le(self.arg)
elif _on == OP.long:
self.arg._write(stream)
elif _on == OP.bytearray8:
self.arg._write(stream)
elif _on == OP.put:
self.arg._write(stream)
elif _on == OP.long4:
self.arg._write(stream)
elif _on == OP.binbytes:
self.arg._write(stream)
elif _on == OP.proto:
stream.write_u1(self.arg)
elif _on == OP.frame:
stream.write_u8le(self.arg)
elif _on == OP.string:
self.arg._write(stream)
elif _on == OP.binunicode:
self.arg._write(stream)
elif _on == OP.float:
self.arg._write(stream)
elif _on == OP.global_opcode:
self.arg._write(stream)
elif _on == OP.binput:
stream.write_u1(self.arg)
elif _on == OP.persid:
self.arg._write(stream)
elif _on == OP.ext1:
stream.write_u1(self.arg)
elif _on == OP.short_binunicode:
self.arg._write(stream)
elif _on == OP.binfloat:
stream.write_f8be(self.arg)
elif _on == OP.binunicode8:
self.arg._write(stream)
elif _on == OP.binget:
stream.write_u1(self.arg)
elif _on == OP.binstring:
self.arg._write(stream)
elif _on == OP.binint2:
stream.write_u2le(self.arg)
elif _on == OP.binbytes8:
self.arg._write(stream)
elif _on == OP.binint1:
stream.write_u1(self.arg)
elif _on == OP.inst:
self.arg._write(stream)
elif _on == OP.long_binget:
stream.write_u4le(self.arg)
elif _on == OP.long_binput:
stream.write_u4le(self.arg)
elif _on == OP.int:
self.arg._write(stream)
elif _on == OP.binint:
stream.write_s4le(self.arg)
elif _on == OP.unicode:
self.arg._write(stream)
elif _on == OP.long1:
self.arg._write(stream)
elif _on == OP.short_binstring:
self.arg._write(stream)
elif _on == OP.get:
self.arg._write(stream)
elif _on == OP.short_binbytes:
self.arg._write(stream)
if __name__ == "__main__":
import argparse, glob, os, sys
parser = argparse.ArgumentParser(
description="A tool for translating Ren\'Py .rpyc files.",
#epilog='',
add_help=True)
parser.add_argument("files", metavar="FILE", nargs='?', default="*.rpyc", help="Zero or more files to operate on, can be mask.")
optgroup = parser.add_argument_group("mode").add_mutually_exclusive_group()
optgroup.add_argument("-x", "--extract", action="store_true", help="Extract strings from .rpyc file(s).")
optgroup.add_argument("-r", "--reinsert", action="store_true", help="Reinsert strings to .rpyc file(s).")
parser.add_argument("-od", "--outdir", default=DEF_OUT_DIR, help="Output_directory.")
args = parser.parse_args()
files = glob.glob(args.files)
if len(sys.argv) < 2 or len(files) == 0:
print(sys.argv[0])
parser.print_help(sys.stderr)
sys.exit(1)
for fn in files:
pp = PythonPickle()
with pp.from_file(fn) as f:
#array_data = [[a.val, ''] for a in ]
txtfn= os.path.abspath(fn.replace(".rpyc", '') + "_stringdata.txt")
if args.extract:
with open(txtfn, "w", encoding="utf-8") as txt:
print(f"Writing {txtfn} ...")
txt.write('\n'.join(f.strings))
elif args.reinsert:
with open(txtfn, "r", encoding="utf-8") as txt:
print(f"Reading translated {txtfn} ...")
new_lines = txt.read().splitlines()
f.update_opcode_strings(new_lines)
output = KaitaiStream(BytesIO())
f._write(output)
cwd = os.getcwd()
ofn = os.path.abspath(os.path.abspath(fn))
ofn = ofn.replace(cwd, args.outdir)
ofn_dir = os.path.dirname(ofn)
if True:#not os.path.exists(ofn):
if ofn_dir != cwd:
ofn_dir = os.path.dirname(ofn)
if ofn_dir != '' and not os.path.exists(ofn_dir):
os.makedirs(ofn_dir, exist_ok=True)
print(f"Translating {fn.replace(cwd, '')} ... ")
with open(ofn, "wb") as o:
output.seek(0)
o.write(output.read_bytes_full())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment