Skip to content

Instantly share code, notes, and snippets.

@prophile
Last active May 5, 2020 13:28
Show Gist options
  • Save prophile/5bca7adc2d0f5c2509605b6e022ff604 to your computer and use it in GitHub Desktop.
Save prophile/5bca7adc2d0f5c2509605b6e022ff604 to your computer and use it in GitHub Desktop.
"""
RLE extractor for EV Nova resource files.
Usage information available through python3 parserle.py --help
Requires both `pillow` and `rsrcfork` from PyPI. Alas this is unlikely to
work on non-Darwin platforms.
"""
import os.path
import sys
import enum
import math
import struct
import argparse
import functools
import multiprocessing
from PIL import Image
import rsrcfork
def argument_parser():
"""Build parser for command-line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument(
"file", type=str, nargs="+", help="Resource file.",
)
parser.add_argument(
"-o", "--output", type=str, help="Output directory.", default=".",
)
parser.add_argument(
"-r",
"--resource",
type=int,
action="append",
help="Limit to particular resources.",
default=[],
)
parser.add_argument(
"-j", "--jobs", type=int, help="Number of parallel jobs to run.", default=1,
)
return parser
class Opcode(enum.Enum):
"""
RLE data opcode.
RLE images contain multiple frames, each of the same width and height.
The semantics of the opcodes are:
BEGIN_LINE
: Move to the beginning of the 'next' scanline. Note that each frame begins
with a BEGIN_LINE to start the first scanline.
PIXEL_DATA
: Copy in [argument] bytes of pixel data verbatim. In this case we are dealing
with 16-bit pixel data.
Note: after the run of 16-bit data, the cursor is realigned to a 32-bit
boundary, so if [argument] is not a multiple of 4, add two bytes.
TRANSPARENT_RUN
: The next [argument] _bytes_ in the scanline are transparent pixels.
PIXEL_RUN
: The next [argument] _bytes_ in the scanline are one of two variants of a pixel
colour. Specifically, all variant 1, until the last two which are variant 2.
There then follows variant 1 (2 bytes), then variant 2 (2 bytes).
END_OF_FRAME
: This is the end of the rendering of the current frame; advance to the next.
"""
END_OF_FRAME = 0
BEGIN_LINE = 1
PIXEL_DATA = 2
TRANSPARENT_RUN = 3
PIXEL_RUN = 4
def process_rled(id, name, data):
"""Process an RLED into a Pillow image."""
print(f"Processing {id}({name}):")
# Decode the header
width, height, bpp, palette, frame_count = struct.unpack(">hhhhh", data[:10],)
if palette != 0:
raise ValueError("palette graphic")
if bpp != 16:
raise ValueError("only 16bpp supported")
print(frame_count, "frames")
if frame_count % 36 == 0:
print("maybe", frame_count // 36, "variants?")
grid_width = min(6, frame_count)
grid_height = int(math.ceil(frame_count / grid_width))
image = Image.new("RGBA", (width * grid_width, height * grid_height), (0, 0, 0, 0),)
pixel_buffer = image.load()
index = 16
max_index = len(data)
current_frame = 0
current_line = None
tapehead = 0
def put_pixel(column, pixel):
if column >= width:
raise AssertionError(f"write past end of pixel: {column} >= {width}")
grid_position_x = current_frame % grid_width
grid_position_y = current_frame // grid_width
grid_offset_x = grid_position_x * width
grid_offset_y = grid_position_y * height
x = grid_offset_x + column
y = grid_offset_y + current_line
try:
pixel_buffer[x, y] = (
(pixel & 0x7C00) >> 7,
(pixel & 0x03E0) >> 2,
(pixel & 0x001F) << 3,
255,
)
except IndexError:
print(
f"Frame {current_frame} is at position {grid_position_x}, {grid_position_y}"
)
print(f" base offset is {grid_offset_x}, {grid_offset_y}")
print(f" target pixel is {column}, {current_line}")
print(f" final coördinates are {x}, {y}")
print(f" frame width and height: {width}, {height}")
print(f" grid width and height: {grid_width}, {grid_height}")
print(f" total width and height: {image.width}, {image.height}")
raise
# The grand decode loop
while index < max_index:
(instruction,) = struct.unpack_from(">I", data, index)
opcode = Opcode(instruction >> 24)
argument = instruction & 0xFFFFFF
index += 4
# print("Got opcode", opcode, argument, "line", current_line, "tapehead", tapehead)
if opcode == Opcode.END_OF_FRAME:
if current_line != height - 1:
raise ValueError(
f"spurious end-of-frame on line {current_line}, should be on line {height - 1}",
)
current_frame += 1
current_line = None
if current_frame == frame_count:
# We've advanced through the last frame, exit the loop
break
elif opcode == Opcode.BEGIN_LINE:
if current_line is None:
current_line = 0
else:
current_line += 1
tapehead = 0
elif opcode == Opcode.PIXEL_DATA:
if argument & 1:
raise ValueError("Odd run length")
needs_realignment = bool(argument & 2)
num_pixels = argument // 2
pixels = struct.unpack_from(f">{num_pixels}H", data, index,)
for n, pixel in enumerate(pixels):
put_pixel(tapehead + n, pixel)
tapehead += num_pixels
index += argument
if needs_realignment:
index += 2
assert index % 4 == 0
elif opcode == Opcode.PIXEL_RUN:
(run_command,) = struct.unpack_from(">I", data, index)
index += 4
for i in range(0, argument, 4):
put_pixel(tapehead, run_command >> 16)
tapehead += 1
if i + 2 < argument:
put_pixel(tapehead, run_command & 0xFFFF)
tapehead += 1
elif opcode == Opcode.TRANSPARENT_RUN:
tapehead += argument // 2
return image
def _job_handle_resource(id, name, data, output_dir):
"""Handle a given resource, including the output stage."""
try:
data = process_rled(id, name, bytes(data))
except ValueError as e:
print(f"Unable to process {id}: {e}")
return
else:
target = os.path.join(output_dir, f"{name}.png")
data.save(target)
def make_job(resource, output_dir):
"""Generate single "job" for a given resource entity and output dir."""
if resource.name:
name = str(resource.id) + " " + resource.name.decode("macroman")
else:
name = str(resource.id)
return functools.partial(
_job_handle_resource, resource.id, name, resource.data, output_dir,
)
def process_file(file, output_dir, restrict_ids=None):
"""Generate all jobs for processing a particular file."""
fork = rsrcfork.open(file)
try:
rleds = fork["rlëD".encode("macroman")].values()
except KeyError:
rleds = []
for resource in rleds:
if restrict_ids and resource.id not in restrict_ids:
continue
job = make_job(resource, output_dir)
yield job
def _run_job(job):
"""Run a job by calling it."""
# This is outlined due to the limitations of the CPython pickle protocol
# with regard to lambdas.
job()
def main(args=sys.argv[1:]):
"""Run as main entry point."""
options = argument_parser().parse_args(args)
jobs = []
for file in options.file:
jobs.extend(process_file(file, options.output, options.resource))
if options.jobs == 1:
for job in jobs:
job()
else:
with multiprocessing.Pool(options.jobs) as worker_pool:
worker_pool.map(_run_job, jobs)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment