Last active
June 13, 2023 16:27
-
-
Save psobot/bf50c2090bb0fbe5380aefaafea17eed to your computer and use it in GitHub Desktop.
Kurzweil KOS File Packer/Unpacker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Kurzweil K2500/K2600 KOS operating system update file packer/repacker | |
by Peter Sobot, Nov 6, 2021 | |
@psobot / [email protected] | |
Requirements: none! Pure Python. Just use Python 3.2+. | |
""" | |
import os | |
import math | |
import struct | |
import argparse | |
from glob import glob | |
BLOCK_SIZE = 0x20000 | |
def chunks(lst, n): | |
"""Yield successive n-sized chunks from lst.""" | |
for i in range(0, len(lst), n): | |
yield lst[i : i + n] | |
def compute_block_checksum(data: bytes, base: int = 0) -> int: | |
acc = base | |
for byte in data: | |
acc = ((byte + acc) * 2 | (byte + acc >> 31)) & 0xFFFFFFFF | |
return acc | |
def unpack(input_file: str, output_directory: str): | |
"""Given a path to a KOS file, write its binary contents to an output directory.""" | |
output_file = None | |
output_address = None | |
chunk_index = 0 | |
with open(input_file, 'rb') as f: | |
header = f.read(4) | |
if header == b"SYS2": | |
f.read(12) # Skip number of files, file index, and number of blocks | |
elif header != b"SYS0": | |
raise ValueError(f"Expected a SYS0 header, but found: {header}.") | |
while True: | |
address_to_write_to = f.read(4) | |
if len(address_to_write_to) < 4: | |
break | |
address_to_write_to = int.from_bytes(address_to_write_to, 'big', signed=False) | |
if output_address is None or output_address != address_to_write_to: | |
if output_file: | |
print(f"Wrote {output_file.tell():,} bytes to {output_file.name}.") | |
output_file.close() | |
output_address = address_to_write_to | |
os.makedirs(output_directory, exist_ok=True) | |
output_file = open( | |
os.path.join(output_directory, f"0x{address_to_write_to:08x}.bin"), 'wb' | |
) | |
chunk = f.read(BLOCK_SIZE) | |
chunk_index += 1 | |
output_file.write(chunk) | |
checksum = f.read(4) | |
expected_checksum = compute_block_checksum(chunk).to_bytes(4, 'big') | |
if expected_checksum != checksum: | |
print( | |
f"WARNING: Checksum for chunk {chunk_index:,} was expected to be" | |
f" {expected_checksum.hex()}, but found {checksum.hex()}" | |
) | |
output_address += len(chunk) | |
if output_file: | |
print(f"Wrote {output_file.tell():,} bytes to {output_file.name}.") | |
output_file.close() | |
def pack(input_directory: str, output_filename: str, output_format: str): | |
""" | |
Given a path to a directory containing .bin files, | |
named with their byte offsets to write to, write its | |
binary contents to a single .KOS output file. | |
""" | |
input_files = sorted(glob(os.path.join(input_directory, "*.bin"))) | |
offset_to_data = {} | |
for filename in input_files: | |
try: | |
input_offset = int(os.path.basename(filename).split(".")[0].split("0x")[-1], 16) | |
with open(filename, "rb") as f: | |
while True: | |
block = f.read(BLOCK_SIZE) | |
if len(block) == 0: | |
break | |
if len(block) < BLOCK_SIZE: | |
block += bytes([0] * (BLOCK_SIZE - len(block))) | |
offset_to_data[input_offset] = block | |
input_offset += len(block) | |
except Exception as e: | |
raise ValueError(f"Failed to parse the Flash ROM offset from {filename}!", e) | |
if output_format == "SYS2": | |
output_filename = output_filename.replace(".KOS", "") | |
print(offset_to_data.keys()) | |
blocks_per_file = list(chunks(sorted(offset_to_data.items(), key=lambda x: x[0]), 11)) | |
for file_index, blocks in enumerate(blocks_per_file): | |
sub_filename = output_filename + chr(ord("A") + file_index) + ".KOS" | |
print(f"Writing {len(blocks):,} blocks to {sub_filename}...") | |
with open(sub_filename, 'wb') as f: | |
f.write(output_format.encode("utf-8")) | |
f.write(struct.pack(">I", len(blocks_per_file))) | |
f.write(struct.pack(">I", file_index)) | |
f.write(struct.pack(">I", len(blocks))) | |
for input_offset, data in blocks: | |
print(f"Will write {len(data):,} bytes to Flash at 0x{input_offset:08x}.") | |
f.write(struct.pack(">I", input_offset)) | |
f.write(data) | |
f.write(struct.pack(">I", compute_block_checksum(data))) | |
print(f"Wrote {f.tell():,} bytes to {sub_filename}.") | |
elif output_format == "SYS0": | |
blocks = sorted(offset_to_data.items(), key=lambda x: x[0]) | |
print(f"Writing {len(blocks):,} blocks to {output_filename}...") | |
with open(output_filename, 'wb') as f: | |
f.write(output_format.encode("utf-8")) | |
for input_offset, data in blocks: | |
print(f"Will write {len(block):,} bytes to Flash at 0x{input_offset:08x}.") | |
f.write(struct.pack(">I", input_offset)) | |
f.write(data) | |
f.write(struct.pack(">I", compute_block_checksum(data))) | |
print(f"Wrote {f.tell():,} bytes to {output_filename}.") | |
else: | |
raise ValueError(f"Unknown output format {output_format}!") | |
def main(): | |
parser = argparse.ArgumentParser( | |
description=( | |
"Packer/unpacker for Kurzweil K2500/K2600 OS update files. Will take a .KOS file and" | |
" dump it into a directory of .bin files, named by their offsets in the Kurzweil's" | |
" Flash ROM, or vice versa. Known to work on the K2500 and the K2600, but not the" | |
" K2661.\n\nNote: When unpacking a K2600 OS (SYS2), unpack each of the files into" | |
" a common output directory to ensure that they can be re-packed correctly." | |
) | |
) | |
parser.add_argument("input", help="Input file or directory to use when unpacking.") | |
parser.add_argument("output", help="Output file or directory to unpack data into.") | |
parser.add_argument( | |
"--overwrite", action="store_true", help="Allow overwriting output file/directory." | |
) | |
parser.add_argument( | |
"--output-format", | |
choices=["SYS0", "SYS2"], | |
default="SYS0", | |
help="Which file format to write, when writing: SYS0 for K2500, SYS2 for K2600.", | |
) | |
args = parser.parse_args() | |
if os.path.isfile(args.output) and not args.overwrite: | |
print(f"{args.output} already exists!") | |
raise SystemExit(1) | |
if os.path.isdir(args.input): | |
pack(args.input, args.output, args.output_format) | |
elif os.path.isdir(args.output) or not os.path.exists(args.output): | |
unpack(args.input, args.output) | |
print("Done!") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment