Last active
April 14, 2025 01:06
-
-
Save mara004/428a9aad5d553d4631ab0b5119eb74b2 to your computer and use it in GitHub Desktop.
PDF rendering with Ghostscript, revisited (ABI bindings, in-memory approach)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Four lines intentionally left blank | |
# SPDX-FileCopyrightText: 2025 geisserml <[email protected]> | |
# SPDX-License-Identifier: MPL-2.0 OR GPL-3.0-or-later | |
# Note that Ghostscript is AGPL-licensed, so this code is altogether affected by copyleft | |
# Written with Ghostscript 9.56.1 on Fedora. | |
# Dev Resources: | |
# - The comments in ghostscript's public headers. They are actually more straightforward than the HTML pages IMO. | |
# - https://ghostscript.readthedocs.io/en/latest/API.html | |
# - https://ghostscript.readthedocs.io/en/latest/Use.html | |
# - https://ghostscript.readthedocs.io/en/latest/Devices.html | |
# - https://ghostscript.readthedocs.io/en/latest/LanguageBindingsPython.html | |
# - https://github.com/ArtifexSoftware/ghostpdl/tree/master/demos/python | |
# - https://gitlab.com/pdftools/python-ghostscript | |
# - https://github.com/albel727/rust-ghostscript/tree/master/ghostscript-rs/examples/memory-render | |
# Auto-generate the libgs bindings module via pypdfium2-ctypesgen: | |
# ctypesgen -i /usr/include/ghostscript/*.h -o libgs.py -l gs --no-symbol-guards --no-macro-guards | |
# FIXME ctypes.util.find_library() may not be able to find ghostscript on Windows | |
from libgs import * | |
from ctypes import * | |
import libgs | |
import ctypes | |
import sys | |
import atexit | |
import signal | |
from pathlib import Path | |
if sys.version_info < (3, 8): | |
from functools import lru_cache | |
def cached_property(func): | |
return property( lru_cache(maxsize=1)(func) ) | |
else: | |
from functools import cached_property | |
def log(*args, **kwargs): | |
print(*args, **kwargs, file=sys.stderr) | |
class _LazyClass: | |
@cached_property | |
def numpy(self): | |
log("Evaluating lazy import 'numpy' ...") | |
import numpy; return numpy | |
@cached_property | |
def PIL_Image(self): | |
log("Evaluating lazy import 'PIL.Image' ...") | |
import PIL.Image; return PIL.Image | |
Lazy = _LazyClass() | |
GLOBAL_RC = 0 | |
def _interrupt_handler(signum, frame): | |
global GLOBAL_RC | |
log("\n*** SIGINT received ***") | |
GLOBAL_RC = -1 # or maybe gs_error_InterpreterExit | |
# Always propagate the interrupt so we don't have to wait on python code in callbacks. | |
# This seems to cause some pollution compared to just waiting until the callback returns, but quick exit is more important to us. | |
raise KeyboardInterrupt() | |
signal.signal(signal.SIGINT, _interrupt_handler) | |
# Library scope | |
def init_gs(): | |
minst = c_void_p() | |
rc = gsapi_new_instance(minst, None) | |
assert rc >= 0 | |
rc = gsapi_set_arg_encoding(minst, GS_ARG_ENCODING_UTF8) | |
assert rc == 0 | |
return minst | |
minst = init_gs() | |
atexit.register(gsapi_delete_instance, minst) | |
def run_args(py_args): | |
enc_args = [(a+"\0").encode("utf-8") for a in py_args] | |
enc_args.insert(0, b"") | |
gsargv = (c_char_p * len(enc_args))(*enc_args) | |
gsargv_ptr = cast(gsargv, POINTER(POINTER(c_char))) | |
try: | |
rc = gsapi_init_with_args(minst, len(enc_args), gsargv_ptr) | |
assert rc in (0, gs_error_Quit), rc | |
finally: | |
rc = gsapi_exit(minst) | |
assert rc in (0, gs_error_Quit), rc | |
class Bitmap: | |
def __init__(self, width, height, stride, format, buffer): | |
self.width, self.height, self.stride, self.format, self.buffer = \ | |
width, height, stride, format, buffer | |
def __repr__(self): | |
return f"{Bitmap.__name__}{tuple(vars(self).values())}" | |
# TODO: to_*(): handle self.format instead of hard-coding RGB | |
def to_pil(self): | |
return Lazy.PIL_Image.frombuffer( | |
"RGB", # dest mode | |
(self.width, self.height), # size | |
self.buffer, # buffer | |
"raw", # decoder | |
"RGB", # src mode | |
self.stride, # bytes per row | |
1 # orientation (top->bottom) | |
) | |
def to_numpy(self): | |
n_channels = 3 | |
return Lazy.numpy.ndarray( | |
# layout: row major | |
shape = (self.height, self.width, n_channels), | |
dtype = c_ubyte, | |
buffer = self.buffer, | |
# number of bytes per item for each nesting level (outer->inner: row, pixel, value) | |
strides = (self.stride, n_channels, 1), | |
) | |
class DisplayImpls: | |
__slots__ = ("bitmap", "buffer", "callback", "count") | |
def __init__(self): | |
self.buffer, self.bitmap = None, None | |
def _assert_mem_match(self, mem): | |
buffer_addr = addressof(self.buffer) | |
if mem != buffer_addr: | |
log(f"!!! PANIC: addresses don't match: {mem} != {buffer_addr}") | |
return -1 | |
return GLOBAL_RC | |
def display_size(self, _, device, width, height, stride, format, pimage): | |
# Note, this function may be called multiple times per page (typically twice). Ghostscript's API design is flawed. | |
log("SIZE", device, width, height, stride, format, pimage) | |
rc = self._assert_mem_match( addressof(pimage.contents) ) | |
if rc != 0: return rc | |
self.bitmap = Bitmap(width, height, stride, format, None) | |
return GLOBAL_RC | |
def display_page(self, _, device, copies, flush): | |
log("PAGE", device, copies, flush) | |
if not (self.buffer and self.bitmap and self.callback): | |
log("!!! PANIC: required attribute(s) not set") | |
return -1 | |
self.bitmap.buffer = self.buffer | |
self.callback(self.bitmap, self.count) | |
self.count += 1 | |
self.bitmap = None | |
return GLOBAL_RC | |
def display_memalloc(self, _, device, size): | |
log("MEMALLOC", size) | |
self.buffer = (c_ubyte * size)() | |
# Important: don't use cast(self.buffer, c_void_p).value - this causes a memory leak, probably due to a persistent reference or something. | |
# Thus, we have to assume that address and pointer values are identical on the host platform. | |
return addressof(self.buffer) if GLOBAL_RC == 0 else None | |
def display_memfree(self, _, device, mem): | |
# We do not need to actually free the memory here because the buffer has been allocated by ctypes and will be automatically released after its refcount has reached zero. | |
log("MEMFREE (intentionally ignored)", device, mem) | |
rc = self._assert_mem_match(mem) | |
if rc != 0: return rc | |
self.buffer = None | |
return GLOBAL_RC | |
impls_obj = DisplayImpls() | |
dsp_fields = dict(display_callback_s._fields_) | |
WITH_DEBUG = True # whether to show dummy calls | |
def dummy_field(name): | |
functype = dsp_fields[f"display_{name}"] | |
@functype | |
def dummy_impl(*args): | |
if WITH_DEBUG: print(name.upper(), *args) | |
return GLOBAL_RC | |
return dummy_impl | |
def actual_field(name): | |
return dsp_fields[f"display_{name}"]( getattr(impls_obj, f"display_{name}") ) | |
# def null_field(name): | |
# return cast(None, dsp_fields[f"display_{name}"]) | |
def _get_dsp_struct_ver(): | |
if DISPLAY_VERSION_MAJOR <= 3: | |
return DISPLAY_VERSION_MAJOR, DISPLAY_VERSION_MINOR | |
else: | |
return 3, getattr(libgs, "DISPLAY_VERSION_MINOR_V3", 0) | |
dsp_major_v, dsp_minor_v = _get_dsp_struct_ver() | |
print(f"Display struct version: {dsp_major_v}, {dsp_minor_v}", file=sys.stderr) | |
m_callback_s = display_callback_s( | |
size = sizeof(display_callback_s), | |
version_major = dsp_major_v, | |
version_minor = dsp_minor_v, | |
display_open = dummy_field("open"), | |
display_preclose = dummy_field("preclose"), | |
display_close = dummy_field("close"), | |
display_presize = dummy_field("presize"), | |
display_size = actual_field("size"), | |
display_sync = dummy_field("sync"), | |
display_page = actual_field("page"), | |
# display_update = null_field("update"), | |
display_memalloc = actual_field("memalloc"), | |
display_memfree = actual_field("memfree"), | |
# display_separation = null_field("separation"), # v2 | |
# display_adjust_band_height = null_field("adjust_band_height"), # v3 | |
# display_rectangle_request = null_field("rectangle_request"), # v3 | |
) | |
LEGACY_METHOD_OK = False | |
if LEGACY_METHOD_OK: | |
# we don't care about the handle, so no need to pass -dDisplayHandle=... later | |
rc = gsapi_set_display_callback(minst, byref(m_callback_s)) | |
assert rc == 0 | |
else: | |
@gs_callout | |
def m_callout_handler(instance, callout_handle, device_name, id, size, data): | |
if not device_name or string_at(device_name) != b"display": | |
return -1 | |
if id == DISPLAY_CALLOUT_GET_CALLBACK: | |
cb = cast(data, POINTER(gs_display_get_callback_t)).contents | |
cb.callback = pointer(m_callback_s) | |
cb.caller_handle = callout_handle | |
return 0 | |
return -1 | |
# we don't care about the handle, so pass through None | |
rc = gsapi_register_callout(minst, m_callout_handler, None) | |
assert rc == 0 | |
def render(input_f, password=None, pages=None, dpi=300, output_prefix=None, device="png16m", display_format="16#804", display_callback=None): | |
# See https://ghostscript.readthedocs.io/en/latest/Use.html | |
dpi = round(dpi, 6) | |
args = [ | |
"-dNOPAUSE", | |
"-dBATCH", | |
"-dSAFER", | |
# "-dQUIET", | |
"-dNOPROMPT", | |
f"-sDEVICE={device}", | |
f"-r{dpi:f}x{dpi:f}", | |
# The options below are for smoother text rendering. The default text render mode is unbearable. | |
"-dInterpolateControl=-1", | |
"-dTextAlphaBits=4", | |
"-dGraphicsAlphaBits=4", | |
] | |
if pages: | |
# alternatively -dFirstPage and -dLastPage | |
args.append(f"-sPageList={pages}") | |
if password: | |
args.append(f"-sPDFPassword={password}") | |
is_display = device == "display" | |
if is_display: | |
assert display_format and display_callback | |
impls_obj.count = 0 | |
impls_obj.callback = display_callback | |
args.append(f"-dDisplayFormat={display_format}") # see Devices.html | |
else: | |
assert output_prefix | |
args.append(f"-sOutputFile={output_prefix}_%d.png") | |
try: | |
args.extend(["-f", str(input_f)]) | |
print(args) | |
run_args(args) | |
finally: | |
if is_display: | |
impls_obj.count, impls_obj.callback = 0, None | |
class SaverClass: | |
def __init__(self, prefix, ext="jpg"): | |
self.prefix = prefix | |
self.ext = ext | |
def _get_path(self, count): | |
return f"{self.prefix}_{count+1}.{self.ext}" | |
def write_pil(self, bitmap, count): | |
pil_image = bitmap.to_pil() | |
pil_image.save( self._get_path(count) ) | |
def write_pil_from_numpy(self, bitmap, count): | |
# TODO: handle bitmap.format instead of hardcoding RGB | |
pil_image = PIL.Image.fromarray(bitmap.to_numpy(), "RGB") | |
pil_image.save( self._get_path(count) ) | |
def main(): | |
# e.g. ~/Downloads/move/pdfs/CinelerraGG_Manual.pdf ./out/test 1-30 200 | |
input_f = Path(sys.argv[1]).expanduser().resolve() | |
output_prefix = Path(sys.argv[2]).expanduser().resolve() | |
pages = sys.argv[3] | |
dpi = int(sys.argv[4]) | |
saver = SaverClass(output_prefix, ext="jpg") | |
render(input_f, pages=pages, dpi=dpi, device="display", display_callback=saver.write_pil) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sample
libgs
ctypesgen bindings interface:click to expand