Skip to content

Instantly share code, notes, and snippets.

@mara004
Last active April 6, 2025 01:45
Show Gist options
  • Save mara004/0ef12eaa154502f25de2aee94e82f84b to your computer and use it in GitHub Desktop.
Save mara004/0ef12eaa154502f25de2aee94e82f84b to your computer and use it in GitHub Desktop.
Page number spec parser [Draft]
# Four lines intentionally left blank
# SPDX-FileCopyrightText: 2025 geisserml <[email protected]>
# SPDX-License-Identifier: MPL-2.0
# Sophisticated parser for a page number mini-language
# Technically, this might be a use case for some parser generator like pyparsing or PLY, but this is a manual implementation based on common string operations.
# TODO: deferred generation of ref_translator and variables
__all__ = ("pnp", )
import re
import logging
from collections.abc import Iterable
logger = logging.getLogger(__name__)
#: Whether to use special debug methods. You may want to set this to False in a production setting.
IS_DEBUG = True
# -- Tokens --
# Tokens currently must be non-alphanumeric, single characters.
class Token (str):
def __repr__(self):
return "t" + super().__repr__()
OPEN = Token("(")
CLOSE = Token(")")
SEP = Token(",")
RELATIVE = Token("=")
DASH = Token("-") # backward or range
STEP = Token("@")
EXCLUDE = Token("/")
MULTIPLEX = Token("#")
REVERSE = Token("~")
EVAL = Token("!")
# -- Token splitter groups --
# Let's hope that `re` internally builds a lookup table or similar.
# Note that EVAL is handled specially and thus not contained in OPERATORS_RE.
def _compile_op_group(*tokens):
return re.compile("(" + "|".join(re.escape(tk) for tk in tokens) + ")")
SEP_RE = _compile_op_group(SEP)
BRACKETS_RE = _compile_op_group(OPEN, CLOSE)
OPERATORS_RE = _compile_op_group(RELATIVE, DASH, STEP, EXCLUDE, MULTIPLEX, REVERSE)
# -- Variables --
BEGIN = "begin" # may be > 1 in RELATIVE context
END = "end"
ODD = "odd"
EVEN = "even"
ALL = "all"
# -- Marked types --
# See also the Token class above
class Group (list):
def __repr__(self):
return f"G({super().__repr__()[1:-1]})"
def __str__(self):
return f"{OPEN}{''.join(str(p) for p in self)}{CLOSE}"
class OpPart (list):
def __repr__(self):
return "p" + super().__repr__()
class ResolvedPart (list):
def __repr__(self): # pragma: no cover
return "r" + super().__repr__()
# -- Exceptions --
class _RawBracketError (ValueError):
def __init__(self, _, err_stack):
self.err_stack = err_stack
super().__init__() # f"Raw bracket error: {err_stack}"
class BracketError (ValueError):
def __init__(self, parts, err_stack):
highlight = ""
for group in err_stack:
highlight += " "*(len(str(group)) - 2) + "^"
super().__init__(f"Unmatched bracket(s):\n{''.join(parts)}\n{highlight}")
# -- Pre-processing functions (tokenization, grouping) --
# partition_by_tokens() is an re.split() wrapper with the addition of marking split characters as Token and cleaning up empty parts. An alternative implementation could use re.finditer().
def partition_by_tokens(text, regex, callback=None):
parts = re.split(regex, text)
# If provided, apply a callback on any non-tokens. This can be used to strip whitespace.
if callback:
for i in range(0, len(parts), 2):
parts[i] = callback(parts[i])
for i in range(1, len(parts), 2):
parts[i] = Token(parts[i])
return [p for p in parts if p != ""]
def groupify(parts, exc_type=BracketError):
group = Group()
stack = []
for i, p in enumerate(parts):
if p == OPEN:
# push current group to stack and open a new group
stack.append(group)
group = Group()
elif p == CLOSE:
if len(stack) > 0:
# merge current group into nearest group on stack and pop
stack[-1].append(group)
group = stack.pop()
else: # unmatched closing brackets
# recurse over the remaining part to show all errors at once
# use two separate exception types to calculate highlighting only once for the caller-facing exception
err_stack = [group]
try:
groupify(parts[i+1:], exc_type=_RawBracketError)
except _RawBracketError as e:
err_stack.extend(e.err_stack)
raise exc_type(parts, err_stack)
else: # not isinstance(p, Token)
group.append(p)
# check against unmatched open brackets
if len(stack) > 0:
raise exc_type(parts, stack)
return group
# -- Parser utility declarations --
def _tokenize_part(part):
return partition_by_tokens(part, OPERATORS_RE, str.strip)
def _apply_excludes(base, excludes):
not_found = []
for v in excludes:
# note, in case of multiple occurrences, this removes the leftmost item
base.remove(v) if v in base else not_found.append(v)
if len(not_found) > 0:
raise ValueError(f"Unfound excludes: {base} / {not_found}")
def _resolve_to_tuple(item):
return tuple(item) if isinstance(item, Iterable) else (item, )
def _prios_keyfunc(pair):
_, (prio, _) = pair
return prio
def indented(f, name):
def variant(self, *args, **kwargs):
self._pad += " "
logger.debug(self._pad + f"-- Recursive {name} --")
try:
return f(self, *args, **kwargs)
finally:
self._pad = self._pad[:-2]
return variant
def fzip(*args, **kwargs):
# or itertools.chain(*zip(*args, **kwargs))
return (v for p in zip(*args, **kwargs) for v in p)
NO_OPERATOR = (0, None)
# -- The parser --
class PNParser:
def __init__(self, ref_translator, variables):
self.ref_translator = ref_translator
self.variables = variables
self.ctx = None # initial
self._pad = "" # for indented debugging
if IS_DEBUG:
def debug(self, msg, value):
logger.debug(f"{self._pad}{msg}:\n{self._pad} {value!r}")
else:
def debug(self, msg, value):
pass
# -- Main parser chain --
# Root entry point
def parse_input(self, string):
self.debug("Input", string)
bracket_parts = partition_by_tokens(string, BRACKETS_RE)
self.debug("Tokenized Brackets", bracket_parts)
root_group = groupify(bracket_parts)
self.debug("Grouped by Brackets", root_group)
pagenums = self.parse_group(root_group)
return pagenums
parse_input_rc = indented(parse_input, "PNP") if IS_DEBUG else parse_input
# Group parser
def parse_group(self, group):
token_data = self.tokenize(group)
self.debug("Tokenized Operators", token_data)
out = []
for part in token_data:
parsed = self.PART_TABLE[type(part)](self, part)
out.extend(parsed)
return out
parse_group_rc = indented(parse_group, "Group") if IS_DEBUG else parse_group
# Group tokenizer
def tokenize(self, group):
splitted = self.shallow_split(group, SEP_RE)
self.debug("Shallow splitted", splitted)
meta_parts = []
for sep_portion in splitted:
part = []
for i, p in enumerate(sep_portion):
if isinstance(p, str):
if EVAL in p:
a, b = p.split(EVAL, maxsplit=1)
sep_remainder = sep_portion[i+1:]
if not b.strip() and sep_remainder and isinstance(sep_remainder[0], Group):
part.extend( [*_tokenize_part(a), EVAL] )
else:
eval_str = b + "".join(str(x) for x in sep_remainder)
part.extend( [*_tokenize_part(a), EVAL, eval_str] )
break
else:
part.extend(_tokenize_part(p))
else: # isinstance(p, Group)
part.append(p)
meta_parts.append(part[0] if len(part) == 1 else OpPart(part))
# tolerate trailing or multiple commas
return [p for p in meta_parts if p]
# Group splitter (shallow, i.e. not affecting nested groups)
def shallow_split(self, group, regex):
partitioned = []
for item in group:
if isinstance(item, str):
partitioned.extend(partition_by_tokens(item, regex))
else: # isinstance(item, Group)
partitioned.append(item)
self.debug("Tokenized for split", partitioned)
wrapped = []
cursor = 0
sym_indices = (i for i, v in enumerate(partitioned) if isinstance(v, Token))
for i in sym_indices:
wrapped.append(partitioned[cursor:i])
cursor = i+1
wrapped.append(partitioned[cursor:])
return wrapped
# -- Operator caller and type handlers --
# See also parse_group() above, and the tables below
def _apply_ops(self, part, prios):
if len(part) < 2: # primitive
value, = part # unpack
return self.PRIMITIVES_TABLE[type(value)](self, value)
else:
i, (prio, func) = max(enumerate(prios), key=_prios_keyfunc)
# assert (prio, func) != NO_OPERATOR
return func(self, part[:i], part[i+1:], prios[:i], prios[i+1:])
def parse_op_part(self, part):
prios = [self.OP_TABLE[x] if isinstance(x, Token) else NO_OPERATOR for x in part]
return self._apply_ops(part, prios)
def parse_str(self, value):
# value = value.strip()
if value.isdigit():
return (int(value), )
else:
# if it's not a variable, but an invalid string, this will throw a KeyError
var = self.variables[self.ctx][value.lower()]
return _resolve_to_tuple(var)
# -- Operator methods --
def on_relative(self, left, right, lprios, rprios):
leading_ops, label = left[:-1], left[-1]
assert self.ctx is None, "Nested relativity is not meaningful"
self.ctx = label
try:
values = self._apply_ops(right, rprios)
finally:
self.ctx = None
output = [self.ref_translator[label][n] for n in values]
if leading_ops:
return self._apply_ops([*leading_ops, ResolvedPart(output)], [*lprios, NO_OPERATOR])
else:
return output
def on_exclude(self, left, right, lprios, rprios):
base = list( self._apply_ops(left, lprios) )
excludes = self._apply_ops(right, rprios)
_apply_excludes(base, excludes) # modifies base in place -> TODO change this
return base
def on_reverse(self, left, right, lprios, rprios):
assert not left
values = self._apply_ops(right, rprios)
return values[::-1] # or tuple(reversed(value))
def on_multiplex(self, left, right, lprios, rprios):
values = self._apply_ops(left, lprios)
count, = self._apply_ops(right, rprios) # unpack
return values * count
def on_step(self, left, right, lprios, rprios):
values = self._apply_ops(left, lprios)
step, = self._apply_ops(right, rprios) # unpack
return values[::step]
def on_dash(self, left, right, lprios, rprios):
if not left:
stop = self.variables[self.ctx][END] + 1
values = self._apply_ops(right, rprios)
return tuple(stop - n for n in values)
else:
first, = self._apply_ops(left, lprios) # unpack
last, = self._apply_ops(right, rprios) # unpack
return tuple(range(first, last+1) if first < last else range(first, last-1, -1))
def on_eval(self, left, right, lprios, rprios):
assert not left
value, = right # unpack (raw)
value = str(value) # may be a Group
namespace = {**self.variables[self.ctx], "V": self.variables, "R": self.ref_translator, "pnp": self.parse_input_rc, "fzip": fzip}
out = eval(value, namespace)
return _resolve_to_tuple(out)
# -- Type/Operator tables --
PRIMITIVES_TABLE = {
str: parse_str,
Group: parse_group_rc,
ResolvedPart: lambda h, v: v, # passthrough
}
PART_TABLE = PRIMITIVES_TABLE.copy()
PART_TABLE[OpPart] = parse_op_part
# Note that priority must be greater than that of NO_OPERATOR
OP_TABLE = {
# TOKEN: (prio, func)
RELATIVE: (7, on_relative),
EXCLUDE: (6, on_exclude),
REVERSE: (5, on_reverse),
STEP: (4, on_step),
MULTIPLEX: (3, on_multiplex),
DASH: (2, on_dash),
EVAL: (1, on_eval),
}
def validate(pagenums, variables):
pass # TODO
def get_variables(base_length, ref_translator):
variables = {}
stop = base_length + 1
variables[None] = {BEGIN: 1, END: base_length, ODD: range(1, stop, 2), EVEN: range(2, stop, 2), ALL: range(1, stop)}
# NOTE this requires an ordered mapping
for label, mapping in ref_translator.items():
odd, even = [], []
for n in mapping.keys():
(odd if n % 2 else even).append(n)
variables[label] = {
BEGIN: min(mapping.keys()), END: max(mapping.keys()),
ODD: tuple(odd), EVEN: tuple(even), ALL: tuple(mapping.keys()),
}
return variables
def pnp(string, ref_translator=None, variables=None):
parser = PNParser(ref_translator, variables)
pagenums = parser.parse_input(string)
validate(pagenums, variables)
return pagenums
if __name__ == "__main__":
# testing
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
import time
t_doclen = 20
t_ref_translator = {"A": {2:8, 3:9, 4:10, 5:11, 6:12, 7:13, 10:16}, "1": {1:3}}
t_variables = get_variables(t_doclen, t_ref_translator)
logger.debug("Variables:\n" + '\n'.join(f"{k}: {v}" for k, v in t_variables.items()))
def testcase(string, ref_translator=t_ref_translator, variables=t_variables):
out = pnp(string, ref_translator, variables)
print(out, end="\n\n")
return out
def assert_exc(string, exctype):
try:
testcase(string)
except exctype as e:
print(e, end="\n\n")
else: # pragma: no cover
assert False, "Exception expected"
def run_samples():
# NOTE the test samples aren't particularly meaningful or creative at this time, they're just to traverse the code and test parser functionality
assert_exc("1,2,(3,4)),(5,(6", BracketError)
assert_exc("A=B=1", AssertionError) # or "A=(B=1)"
assert_exc("1-3/4", ValueError)
print("Starting to time now.\n")
starttime = time.time()
testcase("1, 2, (3, (4, 5)), 6#2, 6-10/8, (~A=2-3)/9, 8-10/A=2, ~3-5")
testcase("-1, !-1, !8-5, !(3-1)-6, A=!(begin, end), 1-8/!range(2, 5), !fzip((1,2,3), (4,5,6))")
testcase("end, 1, -end, -1, -odd")
testcase("end-1/(1-2), odd/1, even/2, all/(3,5-7)")
testcase("A=(begin, end, even, odd, all)")
testcase("A=(all/(2, 4)), A=all/(2,4), all/A=(2,4), 1=1")
testcase("1-7@2, 7-1, , 1-5#2@2, ")
testcase("!pnp('begin, end'), !fzip(pnp('5-10'), pnp('10-5'))")
testcase("1--1, 1-(-1)", variables={None: dict(end=3)}) # 1-end
duration = time.time() - starttime
print(f"Duration (without exceptions) {duration}s")
run_samples()
@mara004
Copy link
Author

mara004 commented Mar 5, 2025

Hack to avoid the key function in _apply_ops()

diff (click to expand)
 src/pdfng_cli/pnp.py | 25 +++++++++++--------------
 1 file changed, 11 insertions(+), 14 deletions(-)

diff --git a/src/pdfng_cli/pnp.py b/src/pdfng_cli/pnp.py
index e6da003..646ccc9 100644
--- a/src/pdfng_cli/pnp.py
+++ b/src/pdfng_cli/pnp.py
@@ -154,10 +154,6 @@ def _resolve_to_listlike(item):
     else:
         return (item, )
 
-def _prios_keyfunc(pair):
-    _, (prio, _) = pair
-    return prio
-
 def indented(f, name):
     def variant(self, *args, **kwargs):
         self._pad += "  "
@@ -168,7 +164,7 @@ def indented(f, name):
             self._pad = self._pad[:-2]
     return variant
 
-NO_OPERATOR = (0, None)
+NO_OPERATOR = (255, None)
 
 
 # -- The parser --
@@ -270,7 +266,8 @@ class PNParser:
             value, = part  # unpack
             return self.PRIMITIVES_TABLE[type(value)](self, value)
         else:
-            i, (prio, func) = max(enumerate(prios), key=_prios_keyfunc)
+            # Use a tuple comparsion hack to avoid the need for a key function. If there are multiple operators of same priority (e.g. 1--1), we want the first to be prioritized, so use min(), albeit counter-intuitive. We can pass through the (non-comparable) func because i is inremented, so we never have to compare what comes after.
+            (prio, i, func) = min((prio, i, func) for i, (prio, func) in enumerate(prios))
             # assert (prio, func) != NO_OPERATOR
             return func(self, part[:i], part[i+1:], prios[:i], prios[i+1:])
     
@@ -362,16 +359,16 @@ class PNParser:
     PART_TABLE = PRIMITIVES_TABLE.copy()
     PART_TABLE[OpPart] = parse_op_part
     
-    # Note that priority must be greater than that of NO_OPERATOR
+    # Note that a lower number means higher priority, and priority must be less than that of NO_OPERATOR.
     OP_TABLE = {
         # Token(CHAR):    (prio, func)
-        Token(RELATIVE):  (7, on_relative),
-        Token(EXCLUDE):   (6, on_exclude),
-        Token(REVERSE):   (5, on_reverse),
-        Token(STEP):      (4, on_step),
-        Token(MULTIPLEX): (3, on_multiplex),
-        Token(DASH):      (2, on_dash),
-        Token(EVAL):      (1, on_eval),
+        Token(RELATIVE):  (0, on_relative),
+        Token(EXCLUDE):   (1, on_exclude),
+        Token(REVERSE):   (2, on_reverse),
+        Token(STEP):      (3, on_step),
+        Token(MULTIPLEX): (4, on_multiplex),
+        Token(DASH):      (5, on_dash),
+        Token(EVAL):      (6, on_eval),
     }

I don't know if that's desirable, though. For one thing, I don't like the index being included in comparisons.

@mara004
Copy link
Author

mara004 commented Mar 5, 2025

Or alternatively with flattening + itemgetter():

diff (click to expand)
 src/pdfng_cli/pnp.py | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/src/pdfng_cli/pnp.py b/src/pdfng_cli/pnp.py
index e6da003..f70acff 100644
--- a/src/pdfng_cli/pnp.py
+++ b/src/pdfng_cli/pnp.py
@@ -11,6 +11,7 @@ __all__ = ["pnp"]
 
 import re
 import logging
+from operator import itemgetter
 
 logger = logging.getLogger(__name__)
 
@@ -154,9 +155,7 @@ def _resolve_to_listlike(item):
     else:
         return (item, )
 
-def _prios_keyfunc(pair):
-    _, (prio, _) = pair
-    return prio
+_prios_keyfunc = itemgetter(1)
 
 def indented(f, name):
     def variant(self, *args, **kwargs):
@@ -270,7 +269,7 @@ class PNParser:
             value, = part  # unpack
             return self.PRIMITIVES_TABLE[type(value)](self, value)
         else:
-            i, (prio, func) = max(enumerate(prios), key=_prios_keyfunc)
+            i, prio, func = max(((i, prio, func) for i, (prio, func) in enumerate(prios)), key=_prios_keyfunc)
             # assert (prio, func) != NO_OPERATOR
             return func(self, part[:i], part[i+1:], prios[:i], prios[i+1:])

@mara004
Copy link
Author

mara004 commented Apr 6, 2025

Encapsulation patch (click to expand)
 src/pdfng_cli/pnp.py | 35 +++++++++++++++++++++++++++--------
 1 file changed, 27 insertions(+), 8 deletions(-)

diff --git a/src/pdfng_cli/pnp.py b/src/pdfng_cli/pnp.py
index ff6f943..cb0579f 100644
--- a/src/pdfng_cli/pnp.py
+++ b/src/pdfng_cli/pnp.py
@@ -15,9 +15,27 @@ from collections.abc import Iterable
 logger = logging.getLogger(__name__)
 
 
-class Token (str):
+class Token:
+    
+    __slots__ = ("char", )
+    
+    def __init__(self, char):
+        self.char = char
+    
+    def __eq__(self, other):
+        if isinstance(other, Token):
+            return self.char == other.char
+        else:
+            return self.char == other
+    
+    def __hash__(self):
+        return hash(self.char)
+    
+    def __str__(self):
+        return self.char
+    
     def __repr__(self):
-        return "t" + super().__repr__()
+        return f"t{self.char!r}"
 
 # -- Tokens --
 # Tokens currently must be non-alphanumeric, single characters.
@@ -36,8 +54,8 @@ EVAL      = Token("!")
 # Let's hope that `re` internally builds a lookup table or similar.
 # Note that EVAL is handled specially and thus not contained in OPERATORS_RE.
 
-def _compile_op_group(*operators):
-    return re.compile("(" + "|".join(re.escape(op) for op in operators) + ")")
+def _compile_op_group(*tokens):
+    return re.compile("(" + "|".join(re.escape(t.char) for t in tokens) + ")")
 
 SEP_RE = _compile_op_group(SEP)
 BRACKETS_RE  = _compile_op_group(OPEN, CLOSE)
@@ -60,7 +78,7 @@ class Group (list):
         return f"G({super().__repr__()[1:-1]})"
     
     def __str__(self):
-        return f"{OPEN}{''.join(str(p) for p in self)}{CLOSE}"
+        return f"{OPEN.char}{''.join(str(p) for p in self)}{CLOSE.char}"
 
 class OpPart (list):
     def __repr__(self):
@@ -83,7 +101,7 @@ class BracketError (ValueError):
         highlight = ""
         for group in err_stack:
             highlight += " "*(len(str(group)) - 2) + "^"
-        super().__init__(f"Unmatched bracket(s):\n{''.join(parts)}\n{highlight}")
+        super().__init__(f"Unmatched bracket(s):\n{''.join(str(p) for p in parts)}\n{highlight}")
 
 
 # -- Pre-processing functions (tokenization, grouping) --
@@ -227,8 +245,8 @@ class PNParser:
             part = []
             for i, p in enumerate(sep_portion):
                 if isinstance(p, str):
-                    if EVAL in p:
-                        a, b = p.split(EVAL, maxsplit=1)
+                    if EVAL.char in p:
+                        a, b = p.split(EVAL.char, maxsplit=1)
                         sep_remainder = sep_portion[i+1:]
                         if not b.strip() and sep_remainder and isinstance(sep_remainder[0], Group):
                             part.extend( [*_tokenize_part(a), EVAL] )
@@ -275,6 +293,7 @@ class PNParser:
             value, = part  # unpack
             return self.PRIMITIVES_TABLE[type(value)](self, value)
         else:
+            # print(part, prios)
             i, (prio, func) = max(enumerate(prios), key=_prios_keyfunc)
             # assert (prio, func) != NO_OPERATOR
             return func(self, part[:i], part[i+1:], prios[:i], prios[i+1:])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment