Last active
May 5, 2020 13:28
-
-
Save prophile/5bca7adc2d0f5c2509605b6e022ff604 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
RLE extractor for EV Nova resource files. | |
Usage information available through python3 parserle.py --help | |
Requires both `pillow` and `rsrcfork` from PyPI. Alas this is unlikely to | |
work on non-Darwin platforms. | |
""" | |
import os.path | |
import sys | |
import enum | |
import math | |
import struct | |
import argparse | |
import functools | |
import multiprocessing | |
from PIL import Image | |
import rsrcfork | |
def argument_parser(): | |
"""Build parser for command-line arguments.""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"file", type=str, nargs="+", help="Resource file.", | |
) | |
parser.add_argument( | |
"-o", "--output", type=str, help="Output directory.", default=".", | |
) | |
parser.add_argument( | |
"-r", | |
"--resource", | |
type=int, | |
action="append", | |
help="Limit to particular resources.", | |
default=[], | |
) | |
parser.add_argument( | |
"-j", "--jobs", type=int, help="Number of parallel jobs to run.", default=1, | |
) | |
return parser | |
class Opcode(enum.Enum): | |
""" | |
RLE data opcode. | |
RLE images contain multiple frames, each of the same width and height. | |
The semantics of the opcodes are: | |
BEGIN_LINE | |
: Move to the beginning of the 'next' scanline. Note that each frame begins | |
with a BEGIN_LINE to start the first scanline. | |
PIXEL_DATA | |
: Copy in [argument] bytes of pixel data verbatim. In this case we are dealing | |
with 16-bit pixel data. | |
Note: after the run of 16-bit data, the cursor is realigned to a 32-bit | |
boundary, so if [argument] is not a multiple of 4, add two bytes. | |
TRANSPARENT_RUN | |
: The next [argument] _bytes_ in the scanline are transparent pixels. | |
PIXEL_RUN | |
: The next [argument] _bytes_ in the scanline are one of two variants of a pixel | |
colour. Specifically, all variant 1, until the last two which are variant 2. | |
There then follows variant 1 (2 bytes), then variant 2 (2 bytes). | |
END_OF_FRAME | |
: This is the end of the rendering of the current frame; advance to the next. | |
""" | |
END_OF_FRAME = 0 | |
BEGIN_LINE = 1 | |
PIXEL_DATA = 2 | |
TRANSPARENT_RUN = 3 | |
PIXEL_RUN = 4 | |
def process_rled(id, name, data): | |
"""Process an RLED into a Pillow image.""" | |
print(f"Processing {id}({name}):") | |
# Decode the header | |
width, height, bpp, palette, frame_count = struct.unpack(">hhhhh", data[:10],) | |
if palette != 0: | |
raise ValueError("palette graphic") | |
if bpp != 16: | |
raise ValueError("only 16bpp supported") | |
print(frame_count, "frames") | |
if frame_count % 36 == 0: | |
print("maybe", frame_count // 36, "variants?") | |
grid_width = min(6, frame_count) | |
grid_height = int(math.ceil(frame_count / grid_width)) | |
image = Image.new("RGBA", (width * grid_width, height * grid_height), (0, 0, 0, 0),) | |
pixel_buffer = image.load() | |
index = 16 | |
max_index = len(data) | |
current_frame = 0 | |
current_line = None | |
tapehead = 0 | |
def put_pixel(column, pixel): | |
if column >= width: | |
raise AssertionError(f"write past end of pixel: {column} >= {width}") | |
grid_position_x = current_frame % grid_width | |
grid_position_y = current_frame // grid_width | |
grid_offset_x = grid_position_x * width | |
grid_offset_y = grid_position_y * height | |
x = grid_offset_x + column | |
y = grid_offset_y + current_line | |
try: | |
pixel_buffer[x, y] = ( | |
(pixel & 0x7C00) >> 7, | |
(pixel & 0x03E0) >> 2, | |
(pixel & 0x001F) << 3, | |
255, | |
) | |
except IndexError: | |
print( | |
f"Frame {current_frame} is at position {grid_position_x}, {grid_position_y}" | |
) | |
print(f" base offset is {grid_offset_x}, {grid_offset_y}") | |
print(f" target pixel is {column}, {current_line}") | |
print(f" final coördinates are {x}, {y}") | |
print(f" frame width and height: {width}, {height}") | |
print(f" grid width and height: {grid_width}, {grid_height}") | |
print(f" total width and height: {image.width}, {image.height}") | |
raise | |
# The grand decode loop | |
while index < max_index: | |
(instruction,) = struct.unpack_from(">I", data, index) | |
opcode = Opcode(instruction >> 24) | |
argument = instruction & 0xFFFFFF | |
index += 4 | |
# print("Got opcode", opcode, argument, "line", current_line, "tapehead", tapehead) | |
if opcode == Opcode.END_OF_FRAME: | |
if current_line != height - 1: | |
raise ValueError( | |
f"spurious end-of-frame on line {current_line}, should be on line {height - 1}", | |
) | |
current_frame += 1 | |
current_line = None | |
if current_frame == frame_count: | |
# We've advanced through the last frame, exit the loop | |
break | |
elif opcode == Opcode.BEGIN_LINE: | |
if current_line is None: | |
current_line = 0 | |
else: | |
current_line += 1 | |
tapehead = 0 | |
elif opcode == Opcode.PIXEL_DATA: | |
if argument & 1: | |
raise ValueError("Odd run length") | |
needs_realignment = bool(argument & 2) | |
num_pixels = argument // 2 | |
pixels = struct.unpack_from(f">{num_pixels}H", data, index,) | |
for n, pixel in enumerate(pixels): | |
put_pixel(tapehead + n, pixel) | |
tapehead += num_pixels | |
index += argument | |
if needs_realignment: | |
index += 2 | |
assert index % 4 == 0 | |
elif opcode == Opcode.PIXEL_RUN: | |
(run_command,) = struct.unpack_from(">I", data, index) | |
index += 4 | |
for i in range(0, argument, 4): | |
put_pixel(tapehead, run_command >> 16) | |
tapehead += 1 | |
if i + 2 < argument: | |
put_pixel(tapehead, run_command & 0xFFFF) | |
tapehead += 1 | |
elif opcode == Opcode.TRANSPARENT_RUN: | |
tapehead += argument // 2 | |
return image | |
def _job_handle_resource(id, name, data, output_dir): | |
"""Handle a given resource, including the output stage.""" | |
try: | |
data = process_rled(id, name, bytes(data)) | |
except ValueError as e: | |
print(f"Unable to process {id}: {e}") | |
return | |
else: | |
target = os.path.join(output_dir, f"{name}.png") | |
data.save(target) | |
def make_job(resource, output_dir): | |
"""Generate single "job" for a given resource entity and output dir.""" | |
if resource.name: | |
name = str(resource.id) + " " + resource.name.decode("macroman") | |
else: | |
name = str(resource.id) | |
return functools.partial( | |
_job_handle_resource, resource.id, name, resource.data, output_dir, | |
) | |
def process_file(file, output_dir, restrict_ids=None): | |
"""Generate all jobs for processing a particular file.""" | |
fork = rsrcfork.open(file) | |
try: | |
rleds = fork["rlëD".encode("macroman")].values() | |
except KeyError: | |
rleds = [] | |
for resource in rleds: | |
if restrict_ids and resource.id not in restrict_ids: | |
continue | |
job = make_job(resource, output_dir) | |
yield job | |
def _run_job(job): | |
"""Run a job by calling it.""" | |
# This is outlined due to the limitations of the CPython pickle protocol | |
# with regard to lambdas. | |
job() | |
def main(args=sys.argv[1:]): | |
"""Run as main entry point.""" | |
options = argument_parser().parse_args(args) | |
jobs = [] | |
for file in options.file: | |
jobs.extend(process_file(file, options.output, options.resource)) | |
if options.jobs == 1: | |
for job in jobs: | |
job() | |
else: | |
with multiprocessing.Pool(options.jobs) as worker_pool: | |
worker_pool.map(_run_job, jobs) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment