Last active
May 11, 2026 10:55
-
-
Save bitumin/bb7d37eccb557e572dd2fda1c0cd3410 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Wartales save patcher — decode, edit resources/companions, re-encode. | |
| Two kinds of edits are supported: | |
| (1) Global resources (Gold/Influence/Happiness/etc.). Each resource is | |
| stored twice — encoded `(value*0x1F) XOR 0x5b62db6d` plus a plaintext | |
| cache. We patch both. Resources whose current value is < 128 are | |
| stored in a 1-byte hxbit form WITHOUT the encoded copy and are skipped | |
| here to avoid byte-shifts. | |
| (2) Per-companion fields on every `st.Unit` instance. Schema recovered | |
| from `hlboot.dat` (see `src/st/Unit.hx::doSerialize@7494`). Editable: | |
| xp (i32) | |
| aptitudePoints (i32, "unspent" pool) | |
| level (i32) | |
| usedAptitudePoints, usedUpgrades (i32, bookkeeping) | |
| pastAttributesUps (Array<String>) — each entry is a stat name like | |
| "Strength", "Dexterity", "Constitution", "Willpower", | |
| "Movement", "CritHitPercent" etc. Each "Strength" entry | |
| contributes +1 to the displayed base Strength, and so on. | |
| --str/--dex/--con/--wil/--mov/--crit set the COUNT of those | |
| entries in this array, which shifts subsequent body bytes | |
| (re-pack handles this transparently). | |
| Companion targeting: `--companion NAME_OR_INDEX`. Names are matched | |
| case-insensitively (exact, then unique substring). Indexes are 1-based | |
| in the order companions appear in the save body. | |
| Examples: | |
| # List companions and their current state: | |
| python3 patch_save.py save005.dat --list-companions | |
| # Set Mikal's XP and add 6 Strength upgrades: | |
| python3 patch_save.py save005.dat --companion Mikal --xp 9999 --str 6 | |
| # Add 99 unspent aptitude points to companion #3: | |
| python3 patch_save.py save005.dat --companion 3 --aptitude 99 | |
| """ | |
| import argparse, hashlib, re, struct, sys, zlib | |
| from pathlib import Path | |
| # --- format constants (reversed from hlboot.dat: src/st/GameState.hx) --- | |
| CIPHER_MAGIC = 0xCAFECAFE | |
| RESOURCE_KEY = 0x5b62db6d | |
| RESOURCE_MUL = 0x1F | |
| RESOURCE_MAX = (0xFFFFFFFF // RESOURCE_MUL) # 138,547,332 | |
| # Resources we've confirmed can be edited. | |
| RESOURCES = [ | |
| ('Gold', 'gold'), | |
| ('Influence', 'influence'), | |
| ('Happiness', 'happiness'), | |
| ('ActionPoint', 'valour'), # in-game: "Valour Points" | |
| ('Discovery', 'discovery'), | |
| ('Knowledge', 'knowledge'), | |
| ] | |
| RESOURCE_NAMES = [r[0] for r in RESOURCES] | |
| RESOURCE_FLAG_FOR = {internal: flag for internal, flag in RESOURCES} | |
| RESOURCE_LABEL = {internal: ('Valour' if internal == 'ActionPoint' else internal) | |
| for internal, _ in RESOURCES} | |
| # --- low-level codec --- | |
| def cypher(buf: bytearray) -> None: | |
| """In-place XOR. Encrypt and decrypt are the same operation (involution).""" | |
| n = len(buf) // 4 | |
| for i in range(n): | |
| p = i * 4 | |
| w = struct.unpack_from('<I', buf, p)[0] | |
| struct.pack_into('<I', buf, p, (w ^ p ^ CIPHER_MAGIC) & 0xFFFFFFFF) | |
| def make_signature(data: bytes) -> bytes: | |
| """SHA-1(data) + 4 mixing rounds c[k] ^= c[k+8] for k in {0,4,8,12}.""" | |
| h = bytearray(hashlib.sha1(data).digest()) | |
| for k in (0, 4, 8, 12): | |
| a = struct.unpack_from('<I', h, k)[0] | |
| b = struct.unpack_from('<I', h, k + 8)[0] if k + 12 <= len(h) else 0 | |
| struct.pack_into('<I', h, k, (a ^ b) & 0xFFFFFFFF) | |
| return bytes(h) | |
| def encode_resource(v: int) -> int: | |
| return ((v * RESOURCE_MUL) ^ RESOURCE_KEY) & 0xFFFFFFFF | |
| def decode_resource(enc: int): | |
| x = enc ^ RESOURCE_KEY | |
| return x // RESOURCE_MUL if x % RESOURCE_MUL == 0 else None | |
| # --- save unpack/pack --- | |
| def unpack(raw: bytes): | |
| if raw[:3] != b'WTA': | |
| sys.exit(f"not a Wartales save (magic={raw[:4]!r})") | |
| magic = raw[:4] | |
| pos, chunks = 4, [] | |
| while pos + 4 <= len(raw): | |
| n = struct.unpack_from('<i', raw, pos)[0]; pos += 4 | |
| if n < 0 or pos + n > len(raw): | |
| sys.exit(f"corrupt chunk length {n} @ {pos-4}") | |
| chunks.append(bytearray(raw[pos:pos+n])); pos += n | |
| if len(chunks) < 3: | |
| sys.exit(f"expected 3 chunks (header/sig/body), got {len(chunks)}") | |
| encrypted = chunks[0][:1] != b'o' | |
| if encrypted: | |
| for c in chunks: cypher(c) | |
| header = bytes(chunks[0]) | |
| body_raw = bytes(chunks[2]) | |
| compressed = body_raw[:1] == b'\x78' | |
| body = zlib.decompress(body_raw) if compressed else body_raw | |
| return magic, header, bytes(chunks[1]), body, encrypted, compressed | |
| def pack(magic: bytes, header: bytes, body: bytes, encrypt: bool, compress: bool) -> bytes: | |
| sig = make_signature(body) | |
| body_out = zlib.compress(body) if compress else body | |
| chunks = [bytearray(header), bytearray(sig), bytearray(body_out)] | |
| if encrypt: | |
| for c in chunks: cypher(c) | |
| out = bytearray(magic) | |
| for c in chunks: | |
| out += struct.pack('<i', len(c)); out += c | |
| return bytes(out) | |
| # --- resource location --- | |
| def find_resource(body: bytes, name: str): | |
| """Return info about a resource, or None if not present. | |
| Each resource entry is laid out as: | |
| <encoded> <cache> <id 5B> 0c 5c 01 <len+1><name> | |
| Three observed layouts (`enc_form`): | |
| * 'xor5/c5' — 5B XOR-encoded + 5B cache (value ≥ 128, anti-tampered) | |
| * 'xor5/c1' — 5B XOR-encoded + 1B cache (value < 128, anti-tampered) | |
| * 'null/c1' — single null byte (no XOR copy) + 1B cache. | |
| Used by transient resources like ActionPoint | |
| ("Valour Points") that aren't anti-tampered. | |
| Returns (enc_off, cache_off, cache_width, cache_val, enc_raw, decoded, | |
| enc_form). For 'null/c1', enc_raw and decoded are None. | |
| """ | |
| pat = bytes([0x0c, 0x5c, 0x01, len(name) + 1]) + name.encode() | |
| m = re.search(re.escape(pat), body) | |
| if not m: return None | |
| name_idx = m.start() | |
| # 'xor5/c5' — 5B encoded + 5B cache + 5B id = 15 bytes | |
| five_cache_off = name_idx - 10 | |
| five_enc_off = name_idx - 15 | |
| if (five_enc_off >= 0 and body[five_enc_off] == 0x80 | |
| and body[five_cache_off] == 0x80): | |
| cache_val = struct.unpack_from('<i', body, five_cache_off + 1)[0] | |
| enc_raw = struct.unpack_from('<I', body, five_enc_off + 1)[0] | |
| return (five_enc_off, five_cache_off, 5, cache_val, enc_raw, | |
| decode_resource(enc_raw), 'xor5/c5') | |
| # 'xor5/c1' — 5B encoded + 1B cache + 5B id = 11 bytes | |
| one_cache_off = name_idx - 6 | |
| one_enc_off = name_idx - 11 | |
| if (one_enc_off >= 0 and body[one_enc_off] == 0x80 | |
| and body[one_cache_off] < 0x80): | |
| cache_val = body[one_cache_off] | |
| enc_raw = struct.unpack_from('<I', body, one_enc_off + 1)[0] | |
| return (one_enc_off, one_cache_off, 1, cache_val, enc_raw, | |
| decode_resource(enc_raw), 'xor5/c1') | |
| # 'null/c1' — null encoded byte + 1B cache + 5B id = 7 bytes (ActionPoint) | |
| null_cache_off = name_idx - 6 | |
| null_enc_off = name_idx - 7 | |
| if (null_enc_off >= 0 and body[null_enc_off] == 0x00 | |
| and body[null_cache_off] < 0x80): | |
| cache_val = body[null_cache_off] | |
| return (null_enc_off, null_cache_off, 1, cache_val, None, None, | |
| 'null/c1') | |
| return None | |
| def patch_resource(body: bytearray, name: str, new_val: int) -> bool: | |
| info = find_resource(bytes(body), name) | |
| label = RESOURCE_LABEL.get(name, name) | |
| if info is None: | |
| print(f" {label:>12}: not found in this save (skipping)") | |
| return False | |
| enc_off, cache_off, cache_width, cache_val, enc_raw, _, enc_form = info | |
| new_w = 1 if 0 <= new_val < 0x80 else 5 | |
| if new_w != cache_width: | |
| sys.exit( | |
| f"refusing to patch {label}: {cache_val} → {new_val} would change " | |
| f"the cache encoding width ({cache_width}B → {new_w}B), shifting " | |
| f"every byte after it. (Pick a value on the same side of 128, " | |
| f"or add explicit byte-shift support if you need to cross it.)") | |
| if enc_form == 'null/c1': | |
| # No XOR-encoded copy to update — the leading null byte stays as-is. | |
| body[cache_off] = new_val | |
| print(f" {label:>12}: {cache_val:>10} → {new_val:<10} " | |
| f"(no XOR copy; cache 1B)") | |
| else: | |
| new_enc = encode_resource(new_val) | |
| body[enc_off:enc_off+5] = b'\x80' + struct.pack('<I', new_enc) | |
| if cache_width == 5: | |
| body[cache_off:cache_off+5] = b'\x80' + struct.pack('<i', new_val) | |
| else: | |
| body[cache_off] = new_val | |
| print(f" {label:>12}: {cache_val:>10} → {new_val:<10} " | |
| f"(cache {cache_width}B; enc 0x{enc_raw:08x} → 0x{new_enc:08x})") | |
| return True | |
| # --- hxbit varint / string codec (for companion fields) --- | |
| # addInt: 0..127 → 1 byte; otherwise → 0x80 then 4-byte LE i32. | |
| # addString: addInt(len+1) then UTF-8 bytes; 0 means null, 1 means "". | |
| def varint_width(v: int) -> int: | |
| return 1 if 0 <= v < 0x80 else 5 | |
| def encode_varint(v: int) -> bytes: | |
| if 0 <= v < 0x80: | |
| return bytes([v]) | |
| return b'\x80' + struct.pack('<i', v) | |
| def read_varint(b: bytes, off: int): | |
| """Returns (value, new_offset).""" | |
| bv = b[off] | |
| if bv < 0x80: | |
| return bv, off + 1 | |
| if bv == 0x80: | |
| return struct.unpack_from('<i', b, off + 1)[0], off + 5 | |
| raise ValueError(f"unexpected hxbit varint marker 0x{bv:02x} at 0x{off:x}") | |
| def encode_string(s): | |
| if s is None: | |
| return b'\x00' | |
| enc = s.encode('utf-8') | |
| return encode_varint(len(enc) + 1) + enc | |
| def read_string(b: bytes, off: int): | |
| """Returns (string-or-None, new_offset).""" | |
| plus1, off2 = read_varint(b, off) | |
| if plus1 == 0: | |
| return None, off2 | |
| if plus1 == 1: | |
| return '', off2 | |
| n = plus1 - 1 | |
| return b[off2:off2+n].decode('utf-8', errors='replace'), off2 + n | |
| # --- companion location --- | |
| # Each st.Unit instance is preceded by a 4-byte sequence (the UID, written raw). | |
| # Across all companions in this save the high bytes are constant: `91 6a 02`. | |
| # That lets us locate every companion by the marker `91 6a 02` followed by | |
| # `[len+1] [name]`. | |
| COMPANION_MARKER = b'\x91\x6a\x02' | |
| def enumerate_unit_names(body: bytes): | |
| """Yield (name_offset, raw_name) for every plausible st.Unit in the body. | |
| A name is a length-prefixed UTF-8 string preceded by COMPANION_MARKER. | |
| We require the name to be 1..30 printable ASCII chars to avoid matching | |
| arbitrary bytes that happen to follow the marker. | |
| """ | |
| for m in re.finditer(re.escape(COMPANION_MARKER), body): | |
| off = m.end() | |
| if off >= len(body): continue | |
| plus1 = body[off] | |
| if plus1 < 2 or plus1 > 31: continue # 1..30-char name | |
| n = plus1 - 1 | |
| name_bytes = body[off+1:off+1+n] | |
| if not all(0x20 <= c < 0x7f for c in name_bytes): continue | |
| yield off, name_bytes.decode('utf-8', errors='replace') | |
| # --- st.Unit.upgrades array (the actual stat-bonus source) --- | |
| # Each entry is `{a: AttributeKind (string), base: Bool, v: Int}`. UnitStats.load | |
| # folds these in via `addBonus(stats, a, v)`, so the displayed base stat for | |
| # attribute X is `class_default(X) + sum(entry.v where entry.a == X)`. | |
| # | |
| # CLI keys → AttributeKind enum names: | |
| BASE_STAT_KEYS = { | |
| 'str': 'Strength', | |
| 'dex': 'Dexterity', | |
| 'con': 'Constitution', | |
| 'wil': 'Willpower', | |
| 'mov': 'Movement', | |
| 'crit': 'CritHitPercent', | |
| } | |
| # AttributeKind names we accept as valid `a` values when parsing `upgrades`. | |
| # Used as a heuristic filter to recognize the upgrades array start. | |
| KNOWN_ATTRS = { | |
| 'Strength', 'Dexterity', 'Constitution', 'Willpower', 'Movement', | |
| 'CritHitPercent', 'CritHitDamageBonusPercent', 'MaxHealth', 'Armor', | |
| 'Guard', 'VisionRange', 'Morale', 'Transport', | |
| 'DamageBonusPercent', 'DamageReducePercent', | |
| 'DamageBonusOppAttack', 'DamageBonusCrossbow', | |
| 'DistanceBonus', 'PrecisionBonus', | |
| } | |
| def _looks_like_skill(s): | |
| """Heuristic: skill names are short alnum identifiers starting with uppercase.""" | |
| if not s or len(s) > 30 or len(s) < 2: return False | |
| if not all(0x20 <= ord(c) < 0x7f for c in s): return False | |
| if not s[0].isalpha() or not s.replace('_', '').isalnum(): return False | |
| return True | |
| def _try_parse_upgrades_block(body, off, origin_off): | |
| """Parse `overrideUnitClass + upgrades + learntSkills + status + skillLevels` | |
| starting at `off`. Validates by requiring `status` and `skillLevels` to be | |
| empty (matches every observed companion in our saves) and that we land at | |
| or before the known `originRegion` offset. | |
| Returns dict on success, None on failure. | |
| """ | |
| cur = off | |
| try: ouc, cur = read_string(body, cur) | |
| except Exception: return None | |
| if ouc is not None and (len(ouc) > 30 or | |
| not all(0x20 <= ord(c) < 0x7f for c in ouc)): | |
| return None | |
| upgrades_lp1_off = cur | |
| try: up_lp1, cur = read_varint(body, cur) | |
| except Exception: return None | |
| if not (0 <= up_lp1 <= 20): return None | |
| upgrades = [] | |
| for _ in range(max(0, up_lp1 - 1)): | |
| try: bm_p1, cur = read_varint(body, cur) | |
| except Exception: return None | |
| bm = bm_p1 - 1 | |
| if bm not in (0, 1): return None | |
| a = None | |
| if bm & 1: | |
| try: a, cur = read_string(body, cur) | |
| except Exception: return None | |
| if a not in KNOWN_ATTRS: return None | |
| if cur >= len(body): return None | |
| bb = body[cur]; cur += 1 | |
| if bb not in (0, 1): return None | |
| try: v, cur = read_varint(body, cur) | |
| except Exception: return None | |
| if not (-1000 <= v <= 1000): return None | |
| upgrades.append({'a': a, 'base': bool(bb), 'v': v}) | |
| upgrades_end = cur | |
| # learntSkills | |
| try: sk_lp1, cur = read_varint(body, cur) | |
| except Exception: return None | |
| if not (1 <= sk_lp1 <= 30): return None | |
| skills = [] | |
| for _ in range(max(0, sk_lp1 - 1)): | |
| try: s, cur = read_string(body, cur) | |
| except Exception: return None | |
| if not _looks_like_skill(s): return None | |
| skills.append(s) | |
| # status (require empty for this heuristic — matches observed saves) | |
| if cur >= len(body) or body[cur] != 0x01: return None | |
| cur += 1 | |
| # skillLevels (require empty) | |
| if cur >= len(body) or body[cur] != 0x01: return None | |
| cur += 1 | |
| # The block must end at or before originRegion (which is past skinData). | |
| if cur > origin_off: return None | |
| # First skinData byte is a Dynamic tag — should be a small int. | |
| if not (0 <= body[cur] <= 15): return None | |
| return { | |
| 'overrideUnitClass': ouc, | |
| 'upgrades_lp1_off': upgrades_lp1_off, | |
| 'upgrades_end_off': upgrades_end, | |
| 'upgrades': upgrades, | |
| 'learntSkills': skills, | |
| } | |
| def find_unit_upgrades(body, name_off, origin_off, search_window=300): | |
| """Locate the upgrades array for the unit at `name_off`. | |
| Walks past name/woman/kind/flags exactly, then heuristically scans forward | |
| over the variable-length `assignedTool` reference (its hxbit `addAnyRef` | |
| encoding is non-trivial: null=`00`, otherwise CLID + inline content). The | |
| first offset whose `_try_parse_upgrades_block` succeeds — and lands at or | |
| before the known originRegion — is the upgrades array start. | |
| """ | |
| cur = name_off | |
| _, cur = read_string(body, cur) # name | |
| cur += 1 # woman (1 byte bool) | |
| _, cur = read_string(body, cur) # kind | |
| _, cur = read_varint(body, cur) # flags (i32) | |
| flags_end = cur | |
| for delta in range(0, search_window): | |
| x = flags_end + delta | |
| if x >= len(body): break | |
| info = _try_parse_upgrades_block(body, x, origin_off) | |
| if info is not None: | |
| info['assignedTool_skip'] = delta | |
| return info | |
| return None | |
| def find_companion_block(body: bytes, name_off: int, search_limit: int = 1500): | |
| """Locate the field block (originRegion .. pastAttributesUps) for one Unit. | |
| Schema (post-skinData) per Unit.hx::doSerialize: | |
| addString(originRegion) | |
| skinTextureIndex (null<i32>) -- 1 byte flag, then optional varint | |
| addInt(level) | |
| addInt(aptitudePoints) | |
| aptitudePointsPreselect.length+1 (== 1 in all observed saves: empty array) | |
| addInt(usedAptitudePoints) | |
| addInt(usedUpgrades) | |
| addInt(xp) | |
| pastAttributesUps array of strings | |
| We scan candidate originRegion strings forward from the name and accept | |
| the first position whose entire field block parses with values in | |
| plausible ranges. Returns None if no plausible block is found. | |
| """ | |
| end = min(len(body), name_off + search_limit) | |
| for off in range(name_off + 1, end): | |
| try: | |
| origin, p = read_string(body, off) | |
| except Exception: | |
| continue | |
| if not origin or len(origin) < 3 or len(origin) > 30: continue | |
| if not all(0x20 <= ord(c) < 0x7f for c in origin): continue | |
| cur = p | |
| if cur >= len(body): continue | |
| # skinTextureIndex (null<i32>) | |
| flag = body[cur]; cur += 1 | |
| if flag not in (0, 1): continue | |
| if flag == 1: | |
| try: | |
| _, cur = read_varint(body, cur) | |
| except: continue | |
| # level | |
| level_off = cur | |
| try: level, cur = read_varint(body, cur) | |
| except: continue | |
| if not (0 <= level <= 100): continue | |
| # aptitudePoints | |
| ap_off = cur | |
| try: ap, cur = read_varint(body, cur) | |
| except: continue | |
| if not (0 <= ap <= 100): continue | |
| # aptitudePointsPreselect: require an empty array (len+1 == 1 byte) | |
| if cur >= len(body): continue | |
| ap_pre_off = cur | |
| if body[cur] != 0x01: continue | |
| cur += 1 | |
| # usedAptitudePoints | |
| usedAp_off = cur | |
| try: usedAp, cur = read_varint(body, cur) | |
| except: continue | |
| if not (0 <= usedAp <= 100): continue | |
| # usedUpgrades | |
| usedUpg_off = cur | |
| try: usedUpg, cur = read_varint(body, cur) | |
| except: continue | |
| if not (0 <= usedUpg <= 100): continue | |
| # xp | |
| xp_off = cur | |
| try: xp, cur = read_varint(body, cur) | |
| except: continue | |
| if not (0 <= xp <= 10_000_000): continue | |
| # pastAttributesUps array | |
| paUps_lp1_off = cur | |
| try: paUps_lp1, cur = read_varint(body, cur) | |
| except: continue | |
| if paUps_lp1 == 0: | |
| paUps = [] | |
| paUps_strings_off = cur | |
| else: | |
| paUps_strings_off = cur | |
| paUps = [] | |
| ok = True | |
| for _ in range(paUps_lp1 - 1): | |
| try: s, cur = read_string(body, cur) | |
| except: ok = False; break | |
| if s is None or len(s) > 50: | |
| ok = False; break | |
| paUps.append(s) | |
| if not ok: continue | |
| return { | |
| 'name_off': name_off, | |
| 'origin_region': origin, | |
| 'origin_off': off, | |
| 'level': level, 'level_off': level_off, | |
| 'ap': ap, 'ap_off': ap_off, | |
| 'ap_preselect_off': ap_pre_off, | |
| 'usedAp': usedAp, 'usedAp_off': usedAp_off, | |
| 'usedUpg': usedUpg, 'usedUpg_off': usedUpg_off, | |
| 'xp': xp, 'xp_off': xp_off, | |
| 'paUps_lp1_off': paUps_lp1_off, | |
| 'paUps_strings_off': paUps_strings_off, | |
| 'paUps': paUps, | |
| 'paUps_end_off': cur, | |
| } | |
| return None | |
| def enumerate_companions(body: bytes): | |
| """Return a list of dicts, one per st.Unit in body order. Each dict | |
| includes both the post-skinData block (level/ap/xp/paUps) AND the | |
| pre-skinData upgrades array (the actual stat-bonus source).""" | |
| out = [] | |
| for name_off, name in enumerate_unit_names(body): | |
| block = find_companion_block(body, name_off) | |
| if block is None: | |
| continue | |
| block['name'] = name | |
| try: | |
| after_name = name_off + 1 + (body[name_off] - 1) | |
| woman = body[after_name]; after_woman = after_name + 1 | |
| kind, _ = read_string(body, after_woman) | |
| except Exception: | |
| kind = '?' | |
| woman = 0 | |
| block['kind'] = kind or '' | |
| block['woman'] = bool(woman) | |
| # Locate the upgrades array for this unit (best-effort — the | |
| # heuristic depends on status/skillLevels being empty, which holds | |
| # for every companion in the saves we've tested). | |
| ups = find_unit_upgrades(body, name_off, block['origin_off']) | |
| if ups is not None: | |
| block['upgrades_lp1_off'] = ups['upgrades_lp1_off'] | |
| block['upgrades_end_off'] = ups['upgrades_end_off'] | |
| block['upgrades'] = ups['upgrades'] | |
| block['learntSkills'] = ups['learntSkills'] | |
| else: | |
| block['upgrades_lp1_off'] = None | |
| block['upgrades_end_off'] = None | |
| block['upgrades'] = None | |
| block['learntSkills'] = None | |
| out.append(block) | |
| for i, c in enumerate(out, start=1): | |
| c['idx'] = i | |
| return out | |
| def _encode_upgrade_entry(entry): | |
| """Serialize a single upgrades-array entry: bitmask byte + optional addString(a) | |
| + addBool(base) + addInt(v).""" | |
| a = entry.get('a') | |
| bm = 1 if a is not None else 0 | |
| out = encode_varint(bm + 1) # bitmask + 1 | |
| if bm & 1: | |
| out += encode_string(a) # addString(a) | |
| out += bytes([1 if entry['base'] else 0]) # addBool(base) | |
| out += encode_varint(int(entry['v'])) # addInt(v) | |
| return out | |
| def rewrite_upgrades(body: bytearray, comp: dict, new_list): | |
| """Replace the upgrades array (length-prefix + entries) with `new_list`. | |
| Shifts subsequent body bytes by the size delta. pack() recomputes SHA-1.""" | |
| if comp['upgrades_lp1_off'] is None: | |
| sys.exit(f"upgrades array not located for {comp['name']}; cannot patch") | |
| start = comp['upgrades_lp1_off'] | |
| end = comp['upgrades_end_off'] | |
| new_blob = encode_varint(len(new_list) + 1) + b''.join( | |
| _encode_upgrade_entry(e) for e in new_list) | |
| body[start:end] = new_blob | |
| delta = len(new_blob) - (end - start) | |
| comp['upgrades'] = list(new_list) | |
| comp['upgrades_end_off'] = start + len(new_blob) | |
| return delta | |
| def set_upgrade_for_attr(body: bytearray, comp: dict, attr_label: str, target_v: int): | |
| """Set the `upgrades` total for a given attribute label to exactly `target_v`. | |
| Replaces all existing entries with `a == attr_label` by a single new entry | |
| `{a: attr_label, base: True, v: target_v}` (or removes them entirely if | |
| target_v == 0). Other entries are preserved in their original order. | |
| """ | |
| cur = comp.get('upgrades') or [] | |
| cur_total = sum(int(e['v']) for e in cur if e.get('a') == attr_label) | |
| others = [e for e in cur if e.get('a') != attr_label] | |
| new_list = list(others) | |
| if target_v != 0: | |
| new_list.append({'a': attr_label, 'base': True, 'v': int(target_v)}) | |
| rewrite_upgrades(body, comp, new_list) | |
| print(f" {comp['name']:>10}.upgrades.{attr_label:<14}: " | |
| f"{cur_total:+d} → {target_v:+d} " | |
| f"(displayed base = class_default {target_v:+d})") | |
| def resolve_companion(companions, identifier: str): | |
| """Match `identifier` against a 1-based index or a companion name.""" | |
| if identifier.isdigit(): | |
| i = int(identifier) | |
| if 1 <= i <= len(companions): | |
| return companions[i-1] | |
| sys.exit(f"--companion: index {i} out of range (1..{len(companions)})") | |
| needle = identifier.lower() | |
| exact = [c for c in companions if c['name'].lower() == needle] | |
| if len(exact) == 1: return exact[0] | |
| if len(exact) > 1: | |
| names = ', '.join(c['name'] for c in exact) | |
| sys.exit(f"--companion {identifier!r}: ambiguous (matches: {names})") | |
| sub = [c for c in companions if needle in c['name'].lower()] | |
| if len(sub) == 1: return sub[0] | |
| if len(sub) > 1: | |
| names = ', '.join(c['name'] for c in sub) | |
| sys.exit(f"--companion {identifier!r}: ambiguous (matches: {names})") | |
| names = ', '.join(c['name'] for c in companions) | |
| sys.exit(f"--companion {identifier!r}: not found. Available: {names}") | |
| def patch_companion_int(body: bytearray, comp: dict, field: str, new_val: int): | |
| """Patch level / aptitudePoints / xp / usedAptitudePoints / usedUpgrades. | |
| Refuses if the new value would change the encoding width (1-byte ↔ 5-byte), | |
| so all subsequent offsets stay valid in this run. Re-list to see results. | |
| """ | |
| field_to_offkey = { | |
| 'level': 'level_off', | |
| 'ap': 'ap_off', | |
| 'usedAp': 'usedAp_off', | |
| 'usedUpg':'usedUpg_off', | |
| 'xp': 'xp_off', | |
| } | |
| off = comp[field_to_offkey[field]] | |
| cur_val = comp[field] | |
| cur_w = varint_width(cur_val) | |
| new_w = varint_width(new_val) | |
| if cur_w != new_w: | |
| sys.exit( | |
| f"refusing to patch {comp['name']}.{field}: {cur_val}→{new_val} " | |
| f"would change varint width ({cur_w}B→{new_w}B), shifting subsequent " | |
| f"bytes. (Pick a value on the same side of 128, or do an explicit " | |
| f"two-step patch.)") | |
| new_bytes = encode_varint(new_val) | |
| body[off:off+cur_w] = new_bytes | |
| print(f" {comp['name']:>10}.{field:<7}: {cur_val:>8} → {new_val:<8}") | |
| return True | |
| def rewrite_pastAttributesUps(body: bytearray, comp: dict, new_list): | |
| """Replace the entire pastAttributesUps array with `new_list` (list of strings). | |
| Shifts all subsequent body bytes by the size delta. Updates `comp` in place | |
| so callers may continue to reference its (now relative) offsets. | |
| """ | |
| start = comp['paUps_lp1_off'] | |
| end = comp['paUps_end_off'] | |
| new_blob = encode_varint(len(new_list) + 1) + b''.join(encode_string(s) for s in new_list) | |
| body[start:end] = new_blob | |
| delta = len(new_blob) - (end - start) | |
| comp['paUps'] = list(new_list) | |
| comp['paUps_end_off'] = start + len(new_blob) | |
| return delta | |
| def set_base_stat_count(body: bytearray, comp: dict, stat_key: str, target_count: int): | |
| """Set the count of `BASE_STAT_KEYS[stat_key]` entries in pastAttributesUps to `target_count`. | |
| Each entry contributes +1 to the displayed base stat for that attribute, | |
| so this is equivalent to setting the cumulative attribute upgrade count. | |
| Other entries (Movement, CritHitPercent, etc.) are preserved in order. | |
| """ | |
| label = BASE_STAT_KEYS[stat_key] | |
| cur_list = list(comp['paUps']) | |
| cur_count = sum(1 for s in cur_list if s == label) | |
| others = [s for s in cur_list if s != label] | |
| new_list = others + [label] * target_count | |
| delta = rewrite_pastAttributesUps(body, comp, new_list) | |
| print(f" {comp['name']:>10}.{stat_key:<7}: {label} ×{cur_count} → ×{target_count} " | |
| f"(paUps now {new_list})") | |
| return delta | |
| # --- CLI --- | |
| def resource_int(s: str) -> int: | |
| """Argparse type that validates a resource value is in [0, RESOURCE_MAX].""" | |
| try: v = int(s) | |
| except ValueError: raise argparse.ArgumentTypeError(f"not an integer: {s!r}") | |
| if v < 0 or v > RESOURCE_MAX: | |
| raise argparse.ArgumentTypeError( | |
| f"value {v} out of range [0, {RESOURCE_MAX:,}]; encoded form would overflow u32") | |
| return v | |
| def small_int(s: str) -> int: | |
| try: v = int(s) | |
| except ValueError: raise argparse.ArgumentTypeError(f"not an integer: {s!r}") | |
| if v < 0: | |
| raise argparse.ArgumentTypeError(f"value {v} must be >= 0") | |
| return v | |
| def _add_companion_flags(p): | |
| """Attach the companion-target + per-stat flags to a parser. Used both by | |
| the per-group sub-parser and by the global parser's --help so the flags | |
| show up in `patch_save.py --help`.""" | |
| p.add_argument('--companion', metavar='NAME_OR_INDEX', | |
| help='target companion (1-based index or case-insensitive name). ' | |
| 'Repeat to patch multiple companions in one invocation; flags ' | |
| 'after each --companion apply to it until the next --companion.') | |
| p.add_argument('--xp', type=small_int, metavar='N') | |
| p.add_argument('--aptitude', type=small_int, metavar='N', | |
| help='set unspent aptitude points') | |
| p.add_argument('--level', type=small_int, metavar='N') | |
| p.add_argument('--used-aptitude', type=small_int, metavar='N') | |
| p.add_argument('--used-upgrades', type=small_int, metavar='N') | |
| # Base-stat flags now write to `st.Unit.upgrades` (the source-of-truth | |
| # that UnitStats.load() folds into displayed stats via addBonus). Each | |
| # flag SETS the cumulative bonus for that attribute to exactly N — the | |
| # displayed in-game base = class_default + N. Negative values are | |
| # accepted (lowers below class default). | |
| p.add_argument('--str', dest='base_str', type=int, metavar='N', | |
| help='set Strength upgrades-bonus to +N (displayed base = class_default + N)') | |
| p.add_argument('--dex', dest='base_dex', type=int, metavar='N', | |
| help='set Dexterity upgrades-bonus to +N') | |
| p.add_argument('--con', dest='base_con', type=int, metavar='N', | |
| help='set Constitution upgrades-bonus to +N') | |
| p.add_argument('--wil', dest='base_wil', type=int, metavar='N', | |
| help='set Willpower upgrades-bonus to +N') | |
| p.add_argument('--mov', dest='base_mov', type=int, metavar='N', | |
| help='set Movement upgrades-bonus to +N') | |
| p.add_argument('--crit', dest='base_crit', type=int, metavar='N', | |
| help='set CritHitPercent upgrades-bonus to +N') | |
| # Argparse dests touched by _add_companion_flags. Used to detect an empty | |
| # companion group (--companion with no field flags). | |
| _COMPANION_INT_DESTS = ['level', 'aptitude', 'used_aptitude', 'used_upgrades', 'xp'] | |
| _COMPANION_STAT_DESTS = ['base_str', 'base_dex', 'base_con', 'base_wil', | |
| 'base_mov', 'base_crit'] | |
| _GLOBAL_FLAGS_NO_VALUE = {'--list-companions'} | |
| _GLOBAL_FLAGS_WITH_VALUE = {'--output', '-o'} | {f'--{flag}' for _, flag in RESOURCES} | |
| _COMPANION_FLAGS_WITH_VALUE = { | |
| '--companion', '--xp', '--aptitude', '--level', | |
| '--used-aptitude', '--used-upgrades', | |
| '--str', '--dex', '--con', '--wil', '--mov', '--crit', | |
| } | |
| def _split_companion_groups(argv): | |
| """Pre-extract global flags from anywhere in argv, then split the | |
| remainder on each `--companion`. | |
| Returns (global_argv, [companion_argv, ...]). This lets the user write | |
| `-o file.dat` (or any global flag) before, between, or after companion | |
| groups — each `--companion ... --xp ...` block stays self-contained | |
| even if `-o` follows it. | |
| """ | |
| global_argv = [] | |
| rest = [] | |
| i = 0 | |
| n = len(argv) | |
| while i < n: | |
| a = argv[i] | |
| # Long-form `--flag=value` — classify by stem. | |
| if a.startswith('--') and '=' in a: | |
| stem = a.split('=', 1)[0] | |
| (global_argv if stem in _GLOBAL_FLAGS_WITH_VALUE | _GLOBAL_FLAGS_NO_VALUE | |
| else rest).append(a) | |
| i += 1 | |
| continue | |
| if a in _GLOBAL_FLAGS_NO_VALUE: | |
| global_argv.append(a); i += 1; continue | |
| if a in _GLOBAL_FLAGS_WITH_VALUE: | |
| global_argv.append(a) | |
| if i + 1 < n: global_argv.append(argv[i+1]) | |
| i += 2; continue | |
| if a in _COMPANION_FLAGS_WITH_VALUE: | |
| rest.append(a) | |
| if i + 1 < n: rest.append(argv[i+1]) | |
| i += 2; continue | |
| # Positional (the input .dat) — treat as global. Only one expected. | |
| if not a.startswith('-'): | |
| global_argv.append(a); i += 1; continue | |
| # Unknown flag — let the global parser error on it. | |
| global_argv.append(a); i += 1 | |
| # Split the companion-side tokens on each --companion. | |
| idxs = [k for k, t in enumerate(rest) if t == '--companion'] | |
| if not idxs: | |
| if rest: | |
| stray = [t for t in rest if t.startswith('-')] | |
| sys.exit(f"companion patches require --companion NAME_OR_INDEX " | |
| f"(saw {' '.join(stray) or rest})") | |
| return global_argv, [] | |
| if idxs[0] != 0: | |
| stray = rest[:idxs[0]] | |
| sys.exit(f"companion-field flags before --companion: {' '.join(stray)} " | |
| f"(put them after a --companion target)") | |
| groups = [] | |
| for i, idx in enumerate(idxs): | |
| nxt = idxs[i+1] if i+1 < len(idxs) else len(rest) | |
| groups.append(rest[idx:nxt]) | |
| return global_argv, groups | |
| def _extract_companion_patches(group_args): | |
| """Collect non-None companion-field patches from a parsed group namespace.""" | |
| int_fields = { | |
| 'level': group_args.level, | |
| 'ap': group_args.aptitude, | |
| 'usedAp': group_args.used_aptitude, | |
| 'usedUpg':group_args.used_upgrades, | |
| 'xp': group_args.xp, | |
| } | |
| int_fields = {k: v for k, v in int_fields.items() if v is not None} | |
| stat_targets = { | |
| 'str': group_args.base_str, | |
| 'dex': group_args.base_dex, | |
| 'con': group_args.base_con, | |
| 'wil': group_args.base_wil, | |
| 'mov': group_args.base_mov, | |
| 'crit': group_args.base_crit, | |
| } | |
| stat_targets = {k: v for k, v in stat_targets.items() if v is not None} | |
| return int_fields, stat_targets | |
| def main(): | |
| # Global parser owns input file, --output, resources, --list-companions. | |
| # Companion flags are also added so `--help` shows them all in one place, | |
| # but at runtime we hand them to the per-group sub-parser instead. | |
| p = argparse.ArgumentParser( | |
| description=__doc__, | |
| formatter_class=argparse.RawDescriptionHelpFormatter) | |
| p.add_argument('input', help='input save .dat') | |
| p.add_argument('--output', '-o', | |
| help='output file (default: <input>_patched.dat)') | |
| for internal, flag in RESOURCES: | |
| label = RESOURCE_LABEL[internal] | |
| p.add_argument(f'--{flag}', dest=f'res_{flag}', | |
| type=resource_int, metavar='N', | |
| help=f'set {label} to N (0..{RESOURCE_MAX:,})') | |
| p.add_argument('--list-companions', action='store_true', | |
| help='print all companions and their current state') | |
| _add_companion_flags(p) # for --help only; consumed by the sub-parser at runtime | |
| # Split sys.argv into pre-companion globals + one chunk per --companion. | |
| raw_argv = sys.argv[1:] | |
| global_argv, comp_groups = _split_companion_groups(raw_argv) | |
| # Parse globals against the global parser. The companion flags exposed | |
| # there for --help are unused here (they have no values in global_argv), | |
| # but they share dests with the sub-parser, which is fine. | |
| args = p.parse_args(global_argv) | |
| # Build a sub-parser used for each --companion ... block. | |
| sub = argparse.ArgumentParser( | |
| prog='--companion-group', add_help=False, | |
| description='per-companion patches (after each --companion)') | |
| _add_companion_flags(sub) | |
| parsed_groups = [] | |
| for g in comp_groups: | |
| ga = sub.parse_args(g) | |
| if not ga.companion: | |
| sys.exit("internal: companion group missing --companion target") | |
| int_fields, stat_targets = _extract_companion_patches(ga) | |
| if not int_fields and not stat_targets: | |
| sys.exit(f"--companion {ga.companion!r}: no field patches given for this " | |
| f"group (use --xp/--aptitude/--level/--str/--dex/--con/--wil/" | |
| f"--mov/--crit/--used-aptitude/--used-upgrades)") | |
| parsed_groups.append({ | |
| 'identifier': ga.companion, | |
| 'int_fields': int_fields, | |
| 'stat_targets': stat_targets, | |
| }) | |
| src = Path(args.input) | |
| if not src.exists(): sys.exit(f"file not found: {src}") | |
| raw = src.read_bytes() | |
| magic, header, sig, body, encrypted, compressed = unpack(raw) | |
| body_ba = bytearray(body) | |
| print(f"=== {src.name} ===") | |
| print(f" magic={magic.decode('latin1')} encrypted={encrypted} compressed={compressed}") | |
| print(f" body={len(body):,} bytes") | |
| print(f"\nResources currently in save:") | |
| for internal, _ in RESOURCES: | |
| label = RESOURCE_LABEL[internal] | |
| info = find_resource(bytes(body_ba), internal) | |
| if info: | |
| _, _, cache_width, cache_val, _, decoded, enc_form = info | |
| if enc_form == 'null/c1': | |
| tail = '(no XOR copy; 1B cache) ✓' | |
| else: | |
| consistent = '✓' if decoded == cache_val else '✗ (cache/encoded mismatch)' | |
| tail = f"encoded → {decoded} ({cache_width}B cache) {consistent}" | |
| print(f" {label:>12}: {cache_val:>10} {tail}") | |
| else: | |
| print(f" {label:>12}: not present") | |
| if args.list_companions: | |
| comps = enumerate_companions(bytes(body_ba)) | |
| print(f"\nCompanions ({len(comps)} found):") | |
| print(f" {'#':>2} {'name':<14} {'kind':<14} " | |
| f"{'lvl':>3} {'ap':>3} {'used':>5} {'upg':>3} {'xp':>7} upgrades-bonuses") | |
| for c in comps: | |
| ups = c.get('upgrades') | |
| if ups is None: | |
| upg_str = '<could not locate>' | |
| elif not ups: | |
| upg_str = '—' | |
| else: | |
| upg_str = ', '.join( | |
| f"{e['a']}{int(e['v']):+d}" + ('*' if not e['base'] else '') | |
| for e in ups) | |
| print(f" {c['idx']:>2} {c['name']:<14} {c['kind']:<14} " | |
| f"{c['level']:>3} {c['ap']:>3} {c['usedAp']:>5} {c['usedUpg']:>3} " | |
| f"{c['xp']:>7} {upg_str}") | |
| print(" (upgrades-bonuses: each `Attr+N` = +N to that base stat. " | |
| "Trailing `*` = base:false (item/effect modifier).)") | |
| # Build resource patch list (keyed by internal Wartales name). | |
| res_patches = {internal: getattr(args, f'res_{flag}') | |
| for internal, flag in RESOURCES | |
| if getattr(args, f'res_{flag}') is not None} | |
| if not res_patches and not parsed_groups: | |
| if not args.list_companions: | |
| print("\nNo patches given. Nothing to write.") | |
| return | |
| # Apply resource patches first (no byte shifts) | |
| if res_patches: | |
| print(f"\nApplying {len(res_patches)} resource patch(es):") | |
| for r, v in res_patches.items(): | |
| patch_resource(body_ba, r, v) | |
| # Apply each companion group. We re-enumerate before each one so byte | |
| # shifts from a prior group's upgrades rewrite stay consistent. | |
| resolved_targets = [] # for verification: [(companion_name, int_fields, stat_targets), ...] | |
| for grp in parsed_groups: | |
| comps = enumerate_companions(bytes(body_ba)) | |
| comp = resolve_companion(comps, grp['identifier']) | |
| print(f"\nPatching companion #{comp['idx']} {comp['name']!r} ({comp['kind']}):") | |
| # Scalar int fields first (no byte shift). | |
| for field, val in grp['int_fields'].items(): | |
| patch_companion_int(body_ba, comp, field, val) | |
| # Then upgrades-array edits (byte-shifting). Each set_upgrade_for_attr | |
| # call rewrites the full upgrades blob, so the comp dict's offsets | |
| # past upgrades_end_off become stale — we re-enumerate at the start | |
| # of the next group's iteration. | |
| if grp['stat_targets']: | |
| if comp.get('upgrades_lp1_off') is None: | |
| sys.exit(f"--companion {comp['name']!r}: upgrades array could not be " | |
| f"located (heuristic failed — possibly non-empty status or " | |
| f"skillLevels). Stat patches refused to avoid corrupting save.") | |
| for stat_key, val in grp['stat_targets'].items(): | |
| set_upgrade_for_attr(body_ba, comp, BASE_STAT_KEYS[stat_key], val) | |
| resolved_targets.append((comp['name'], grp['int_fields'], grp['stat_targets'])) | |
| # Determine output path | |
| out = Path(args.output) if args.output else src.with_name(src.stem + '_patched' + src.suffix) | |
| new_dat = pack(magic, header, bytes(body_ba), encrypt=encrypted, compress=compressed) | |
| out.write_bytes(new_dat) | |
| print(f"\nWrote {out} ({len(new_dat):,} bytes)") | |
| # Round-trip verify | |
| _, _, _, rt_body, _, _ = unpack(new_dat) | |
| print(f"\nVerify (re-decoding the output):") | |
| all_ok = True | |
| for r, v in res_patches.items(): | |
| label = RESOURCE_LABEL[r] | |
| info = find_resource(rt_body, r) | |
| if info: | |
| _, _, _, cache_val, _, decoded, enc_form = info | |
| if enc_form == 'null/c1': | |
| ok = (cache_val == v) | |
| detail = f"cache={cache_val} (no XOR copy)" | |
| else: | |
| ok = (cache_val == v == decoded) | |
| detail = f"cache={cache_val} encoded→{decoded}" | |
| if not ok: all_ok = False | |
| print(f" {label:>12}: {detail} {'✓' if ok else '✗'}") | |
| if resolved_targets: | |
| rt_comps = enumerate_companions(rt_body) | |
| for name, int_fields, stat_targets in resolved_targets: | |
| rt = next((c for c in rt_comps if c['name'] == name), None) | |
| if rt is None: | |
| print(f" {name}: NOT FOUND after round-trip ✗"); all_ok = False | |
| continue | |
| for field, expected in int_fields.items(): | |
| got = rt[field] | |
| ok = got == expected | |
| if not ok: all_ok = False | |
| print(f" {name}.{field}: {got} {'✓' if ok else '✗ (expected '+str(expected)+')'}") | |
| for stat_key, expected in stat_targets.items(): | |
| label = BASE_STAT_KEYS[stat_key] | |
| got = sum(int(e['v']) for e in (rt.get('upgrades') or []) | |
| if e.get('a') == label) | |
| ok = got == expected | |
| if not ok: all_ok = False | |
| print(f" {name}.upgrades.{label}: {got:+d} " | |
| f"{'✓' if ok else '✗ (expected '+str(expected)+')'}") | |
| if not all_ok: | |
| sys.exit("round-trip verification FAILED") | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment