Last active
April 6, 2025 01:45
-
-
Save mara004/0ef12eaa154502f25de2aee94e82f84b to your computer and use it in GitHub Desktop.
Page number spec parser [Draft]
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Four lines intentionally left blank | |
# SPDX-FileCopyrightText: 2025 geisserml <[email protected]> | |
# SPDX-License-Identifier: MPL-2.0 | |
# Sophisticated parser for a page number mini-language | |
# Technically, this might be a use case for some parser generator like pyparsing or PLY, but this is a manual implementation based on common string operations. | |
# TODO: deferred generation of ref_translator and variables | |
__all__ = ("pnp", ) | |
import re | |
import logging | |
from collections.abc import Iterable | |
logger = logging.getLogger(__name__) | |
#: Whether to use special debug methods. You may want to set this to False in a production setting. | |
IS_DEBUG = True | |
# -- Tokens -- | |
# Tokens currently must be non-alphanumeric, single characters. | |
class Token (str): | |
def __repr__(self): | |
return "t" + super().__repr__() | |
OPEN = Token("(") | |
CLOSE = Token(")") | |
SEP = Token(",") | |
RELATIVE = Token("=") | |
DASH = Token("-") # backward or range | |
STEP = Token("@") | |
EXCLUDE = Token("/") | |
MULTIPLEX = Token("#") | |
REVERSE = Token("~") | |
EVAL = Token("!") | |
# -- Token splitter groups -- | |
# Let's hope that `re` internally builds a lookup table or similar. | |
# Note that EVAL is handled specially and thus not contained in OPERATORS_RE. | |
def _compile_op_group(*tokens): | |
return re.compile("(" + "|".join(re.escape(tk) for tk in tokens) + ")") | |
SEP_RE = _compile_op_group(SEP) | |
BRACKETS_RE = _compile_op_group(OPEN, CLOSE) | |
OPERATORS_RE = _compile_op_group(RELATIVE, DASH, STEP, EXCLUDE, MULTIPLEX, REVERSE) | |
# -- Variables -- | |
BEGIN = "begin" # may be > 1 in RELATIVE context | |
END = "end" | |
ODD = "odd" | |
EVEN = "even" | |
ALL = "all" | |
# -- Marked types -- | |
# See also the Token class above | |
class Group (list): | |
def __repr__(self): | |
return f"G({super().__repr__()[1:-1]})" | |
def __str__(self): | |
return f"{OPEN}{''.join(str(p) for p in self)}{CLOSE}" | |
class OpPart (list): | |
def __repr__(self): | |
return "p" + super().__repr__() | |
class ResolvedPart (list): | |
def __repr__(self): # pragma: no cover | |
return "r" + super().__repr__() | |
# -- Exceptions -- | |
class _RawBracketError (ValueError): | |
def __init__(self, _, err_stack): | |
self.err_stack = err_stack | |
super().__init__() # f"Raw bracket error: {err_stack}" | |
class BracketError (ValueError): | |
def __init__(self, parts, err_stack): | |
highlight = "" | |
for group in err_stack: | |
highlight += " "*(len(str(group)) - 2) + "^" | |
super().__init__(f"Unmatched bracket(s):\n{''.join(parts)}\n{highlight}") | |
# -- Pre-processing functions (tokenization, grouping) -- | |
# partition_by_tokens() is an re.split() wrapper with the addition of marking split characters as Token and cleaning up empty parts. An alternative implementation could use re.finditer(). | |
def partition_by_tokens(text, regex, callback=None): | |
parts = re.split(regex, text) | |
# If provided, apply a callback on any non-tokens. This can be used to strip whitespace. | |
if callback: | |
for i in range(0, len(parts), 2): | |
parts[i] = callback(parts[i]) | |
for i in range(1, len(parts), 2): | |
parts[i] = Token(parts[i]) | |
return [p for p in parts if p != ""] | |
def groupify(parts, exc_type=BracketError): | |
group = Group() | |
stack = [] | |
for i, p in enumerate(parts): | |
if p == OPEN: | |
# push current group to stack and open a new group | |
stack.append(group) | |
group = Group() | |
elif p == CLOSE: | |
if len(stack) > 0: | |
# merge current group into nearest group on stack and pop | |
stack[-1].append(group) | |
group = stack.pop() | |
else: # unmatched closing brackets | |
# recurse over the remaining part to show all errors at once | |
# use two separate exception types to calculate highlighting only once for the caller-facing exception | |
err_stack = [group] | |
try: | |
groupify(parts[i+1:], exc_type=_RawBracketError) | |
except _RawBracketError as e: | |
err_stack.extend(e.err_stack) | |
raise exc_type(parts, err_stack) | |
else: # not isinstance(p, Token) | |
group.append(p) | |
# check against unmatched open brackets | |
if len(stack) > 0: | |
raise exc_type(parts, stack) | |
return group | |
# -- Parser utility declarations -- | |
def _tokenize_part(part): | |
return partition_by_tokens(part, OPERATORS_RE, str.strip) | |
def _apply_excludes(base, excludes): | |
not_found = [] | |
for v in excludes: | |
# note, in case of multiple occurrences, this removes the leftmost item | |
base.remove(v) if v in base else not_found.append(v) | |
if len(not_found) > 0: | |
raise ValueError(f"Unfound excludes: {base} / {not_found}") | |
def _resolve_to_tuple(item): | |
return tuple(item) if isinstance(item, Iterable) else (item, ) | |
def _prios_keyfunc(pair): | |
_, (prio, _) = pair | |
return prio | |
def indented(f, name): | |
def variant(self, *args, **kwargs): | |
self._pad += " " | |
logger.debug(self._pad + f"-- Recursive {name} --") | |
try: | |
return f(self, *args, **kwargs) | |
finally: | |
self._pad = self._pad[:-2] | |
return variant | |
def fzip(*args, **kwargs): | |
# or itertools.chain(*zip(*args, **kwargs)) | |
return (v for p in zip(*args, **kwargs) for v in p) | |
NO_OPERATOR = (0, None) | |
# -- The parser -- | |
class PNParser: | |
def __init__(self, ref_translator, variables): | |
self.ref_translator = ref_translator | |
self.variables = variables | |
self.ctx = None # initial | |
self._pad = "" # for indented debugging | |
if IS_DEBUG: | |
def debug(self, msg, value): | |
logger.debug(f"{self._pad}{msg}:\n{self._pad} {value!r}") | |
else: | |
def debug(self, msg, value): | |
pass | |
# -- Main parser chain -- | |
# Root entry point | |
def parse_input(self, string): | |
self.debug("Input", string) | |
bracket_parts = partition_by_tokens(string, BRACKETS_RE) | |
self.debug("Tokenized Brackets", bracket_parts) | |
root_group = groupify(bracket_parts) | |
self.debug("Grouped by Brackets", root_group) | |
pagenums = self.parse_group(root_group) | |
return pagenums | |
parse_input_rc = indented(parse_input, "PNP") if IS_DEBUG else parse_input | |
# Group parser | |
def parse_group(self, group): | |
token_data = self.tokenize(group) | |
self.debug("Tokenized Operators", token_data) | |
out = [] | |
for part in token_data: | |
parsed = self.PART_TABLE[type(part)](self, part) | |
out.extend(parsed) | |
return out | |
parse_group_rc = indented(parse_group, "Group") if IS_DEBUG else parse_group | |
# Group tokenizer | |
def tokenize(self, group): | |
splitted = self.shallow_split(group, SEP_RE) | |
self.debug("Shallow splitted", splitted) | |
meta_parts = [] | |
for sep_portion in splitted: | |
part = [] | |
for i, p in enumerate(sep_portion): | |
if isinstance(p, str): | |
if EVAL in p: | |
a, b = p.split(EVAL, maxsplit=1) | |
sep_remainder = sep_portion[i+1:] | |
if not b.strip() and sep_remainder and isinstance(sep_remainder[0], Group): | |
part.extend( [*_tokenize_part(a), EVAL] ) | |
else: | |
eval_str = b + "".join(str(x) for x in sep_remainder) | |
part.extend( [*_tokenize_part(a), EVAL, eval_str] ) | |
break | |
else: | |
part.extend(_tokenize_part(p)) | |
else: # isinstance(p, Group) | |
part.append(p) | |
meta_parts.append(part[0] if len(part) == 1 else OpPart(part)) | |
# tolerate trailing or multiple commas | |
return [p for p in meta_parts if p] | |
# Group splitter (shallow, i.e. not affecting nested groups) | |
def shallow_split(self, group, regex): | |
partitioned = [] | |
for item in group: | |
if isinstance(item, str): | |
partitioned.extend(partition_by_tokens(item, regex)) | |
else: # isinstance(item, Group) | |
partitioned.append(item) | |
self.debug("Tokenized for split", partitioned) | |
wrapped = [] | |
cursor = 0 | |
sym_indices = (i for i, v in enumerate(partitioned) if isinstance(v, Token)) | |
for i in sym_indices: | |
wrapped.append(partitioned[cursor:i]) | |
cursor = i+1 | |
wrapped.append(partitioned[cursor:]) | |
return wrapped | |
# -- Operator caller and type handlers -- | |
# See also parse_group() above, and the tables below | |
def _apply_ops(self, part, prios): | |
if len(part) < 2: # primitive | |
value, = part # unpack | |
return self.PRIMITIVES_TABLE[type(value)](self, value) | |
else: | |
i, (prio, func) = max(enumerate(prios), key=_prios_keyfunc) | |
# assert (prio, func) != NO_OPERATOR | |
return func(self, part[:i], part[i+1:], prios[:i], prios[i+1:]) | |
def parse_op_part(self, part): | |
prios = [self.OP_TABLE[x] if isinstance(x, Token) else NO_OPERATOR for x in part] | |
return self._apply_ops(part, prios) | |
def parse_str(self, value): | |
# value = value.strip() | |
if value.isdigit(): | |
return (int(value), ) | |
else: | |
# if it's not a variable, but an invalid string, this will throw a KeyError | |
var = self.variables[self.ctx][value.lower()] | |
return _resolve_to_tuple(var) | |
# -- Operator methods -- | |
def on_relative(self, left, right, lprios, rprios): | |
leading_ops, label = left[:-1], left[-1] | |
assert self.ctx is None, "Nested relativity is not meaningful" | |
self.ctx = label | |
try: | |
values = self._apply_ops(right, rprios) | |
finally: | |
self.ctx = None | |
output = [self.ref_translator[label][n] for n in values] | |
if leading_ops: | |
return self._apply_ops([*leading_ops, ResolvedPart(output)], [*lprios, NO_OPERATOR]) | |
else: | |
return output | |
def on_exclude(self, left, right, lprios, rprios): | |
base = list( self._apply_ops(left, lprios) ) | |
excludes = self._apply_ops(right, rprios) | |
_apply_excludes(base, excludes) # modifies base in place -> TODO change this | |
return base | |
def on_reverse(self, left, right, lprios, rprios): | |
assert not left | |
values = self._apply_ops(right, rprios) | |
return values[::-1] # or tuple(reversed(value)) | |
def on_multiplex(self, left, right, lprios, rprios): | |
values = self._apply_ops(left, lprios) | |
count, = self._apply_ops(right, rprios) # unpack | |
return values * count | |
def on_step(self, left, right, lprios, rprios): | |
values = self._apply_ops(left, lprios) | |
step, = self._apply_ops(right, rprios) # unpack | |
return values[::step] | |
def on_dash(self, left, right, lprios, rprios): | |
if not left: | |
stop = self.variables[self.ctx][END] + 1 | |
values = self._apply_ops(right, rprios) | |
return tuple(stop - n for n in values) | |
else: | |
first, = self._apply_ops(left, lprios) # unpack | |
last, = self._apply_ops(right, rprios) # unpack | |
return tuple(range(first, last+1) if first < last else range(first, last-1, -1)) | |
def on_eval(self, left, right, lprios, rprios): | |
assert not left | |
value, = right # unpack (raw) | |
value = str(value) # may be a Group | |
namespace = {**self.variables[self.ctx], "V": self.variables, "R": self.ref_translator, "pnp": self.parse_input_rc, "fzip": fzip} | |
out = eval(value, namespace) | |
return _resolve_to_tuple(out) | |
# -- Type/Operator tables -- | |
PRIMITIVES_TABLE = { | |
str: parse_str, | |
Group: parse_group_rc, | |
ResolvedPart: lambda h, v: v, # passthrough | |
} | |
PART_TABLE = PRIMITIVES_TABLE.copy() | |
PART_TABLE[OpPart] = parse_op_part | |
# Note that priority must be greater than that of NO_OPERATOR | |
OP_TABLE = { | |
# TOKEN: (prio, func) | |
RELATIVE: (7, on_relative), | |
EXCLUDE: (6, on_exclude), | |
REVERSE: (5, on_reverse), | |
STEP: (4, on_step), | |
MULTIPLEX: (3, on_multiplex), | |
DASH: (2, on_dash), | |
EVAL: (1, on_eval), | |
} | |
def validate(pagenums, variables): | |
pass # TODO | |
def get_variables(base_length, ref_translator): | |
variables = {} | |
stop = base_length + 1 | |
variables[None] = {BEGIN: 1, END: base_length, ODD: range(1, stop, 2), EVEN: range(2, stop, 2), ALL: range(1, stop)} | |
# NOTE this requires an ordered mapping | |
for label, mapping in ref_translator.items(): | |
odd, even = [], [] | |
for n in mapping.keys(): | |
(odd if n % 2 else even).append(n) | |
variables[label] = { | |
BEGIN: min(mapping.keys()), END: max(mapping.keys()), | |
ODD: tuple(odd), EVEN: tuple(even), ALL: tuple(mapping.keys()), | |
} | |
return variables | |
def pnp(string, ref_translator=None, variables=None): | |
parser = PNParser(ref_translator, variables) | |
pagenums = parser.parse_input(string) | |
validate(pagenums, variables) | |
return pagenums | |
if __name__ == "__main__": | |
# testing | |
logger.addHandler(logging.StreamHandler()) | |
logger.setLevel(logging.DEBUG) | |
import time | |
t_doclen = 20 | |
t_ref_translator = {"A": {2:8, 3:9, 4:10, 5:11, 6:12, 7:13, 10:16}, "1": {1:3}} | |
t_variables = get_variables(t_doclen, t_ref_translator) | |
logger.debug("Variables:\n" + '\n'.join(f"{k}: {v}" for k, v in t_variables.items())) | |
def testcase(string, ref_translator=t_ref_translator, variables=t_variables): | |
out = pnp(string, ref_translator, variables) | |
print(out, end="\n\n") | |
return out | |
def assert_exc(string, exctype): | |
try: | |
testcase(string) | |
except exctype as e: | |
print(e, end="\n\n") | |
else: # pragma: no cover | |
assert False, "Exception expected" | |
def run_samples(): | |
# NOTE the test samples aren't particularly meaningful or creative at this time, they're just to traverse the code and test parser functionality | |
assert_exc("1,2,(3,4)),(5,(6", BracketError) | |
assert_exc("A=B=1", AssertionError) # or "A=(B=1)" | |
assert_exc("1-3/4", ValueError) | |
print("Starting to time now.\n") | |
starttime = time.time() | |
testcase("1, 2, (3, (4, 5)), 6#2, 6-10/8, (~A=2-3)/9, 8-10/A=2, ~3-5") | |
testcase("-1, !-1, !8-5, !(3-1)-6, A=!(begin, end), 1-8/!range(2, 5), !fzip((1,2,3), (4,5,6))") | |
testcase("end, 1, -end, -1, -odd") | |
testcase("end-1/(1-2), odd/1, even/2, all/(3,5-7)") | |
testcase("A=(begin, end, even, odd, all)") | |
testcase("A=(all/(2, 4)), A=all/(2,4), all/A=(2,4), 1=1") | |
testcase("1-7@2, 7-1, , 1-5#2@2, ") | |
testcase("!pnp('begin, end'), !fzip(pnp('5-10'), pnp('10-5'))") | |
testcase("1--1, 1-(-1)", variables={None: dict(end=3)}) # 1-end | |
duration = time.time() - starttime | |
print(f"Duration (without exceptions) {duration}s") | |
run_samples() |
Hack to avoid the key function in _apply_ops()
diff (click to expand)
src/pdfng_cli/pnp.py | 25 +++++++++++--------------
1 file changed, 11 insertions(+), 14 deletions(-)
diff --git a/src/pdfng_cli/pnp.py b/src/pdfng_cli/pnp.py
index e6da003..646ccc9 100644
--- a/src/pdfng_cli/pnp.py
+++ b/src/pdfng_cli/pnp.py
@@ -154,10 +154,6 @@ def _resolve_to_listlike(item):
else:
return (item, )
-def _prios_keyfunc(pair):
- _, (prio, _) = pair
- return prio
-
def indented(f, name):
def variant(self, *args, **kwargs):
self._pad += " "
@@ -168,7 +164,7 @@ def indented(f, name):
self._pad = self._pad[:-2]
return variant
-NO_OPERATOR = (0, None)
+NO_OPERATOR = (255, None)
# -- The parser --
@@ -270,7 +266,8 @@ class PNParser:
value, = part # unpack
return self.PRIMITIVES_TABLE[type(value)](self, value)
else:
- i, (prio, func) = max(enumerate(prios), key=_prios_keyfunc)
+ # Use a tuple comparsion hack to avoid the need for a key function. If there are multiple operators of same priority (e.g. 1--1), we want the first to be prioritized, so use min(), albeit counter-intuitive. We can pass through the (non-comparable) func because i is inremented, so we never have to compare what comes after.
+ (prio, i, func) = min((prio, i, func) for i, (prio, func) in enumerate(prios))
# assert (prio, func) != NO_OPERATOR
return func(self, part[:i], part[i+1:], prios[:i], prios[i+1:])
@@ -362,16 +359,16 @@ class PNParser:
PART_TABLE = PRIMITIVES_TABLE.copy()
PART_TABLE[OpPart] = parse_op_part
- # Note that priority must be greater than that of NO_OPERATOR
+ # Note that a lower number means higher priority, and priority must be less than that of NO_OPERATOR.
OP_TABLE = {
# Token(CHAR): (prio, func)
- Token(RELATIVE): (7, on_relative),
- Token(EXCLUDE): (6, on_exclude),
- Token(REVERSE): (5, on_reverse),
- Token(STEP): (4, on_step),
- Token(MULTIPLEX): (3, on_multiplex),
- Token(DASH): (2, on_dash),
- Token(EVAL): (1, on_eval),
+ Token(RELATIVE): (0, on_relative),
+ Token(EXCLUDE): (1, on_exclude),
+ Token(REVERSE): (2, on_reverse),
+ Token(STEP): (3, on_step),
+ Token(MULTIPLEX): (4, on_multiplex),
+ Token(DASH): (5, on_dash),
+ Token(EVAL): (6, on_eval),
}
I don't know if that's desirable, though. For one thing, I don't like the index being included in comparisons.
Or alternatively with flattening + itemgetter()
:
diff (click to expand)
src/pdfng_cli/pnp.py | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/src/pdfng_cli/pnp.py b/src/pdfng_cli/pnp.py
index e6da003..f70acff 100644
--- a/src/pdfng_cli/pnp.py
+++ b/src/pdfng_cli/pnp.py
@@ -11,6 +11,7 @@ __all__ = ["pnp"]
import re
import logging
+from operator import itemgetter
logger = logging.getLogger(__name__)
@@ -154,9 +155,7 @@ def _resolve_to_listlike(item):
else:
return (item, )
-def _prios_keyfunc(pair):
- _, (prio, _) = pair
- return prio
+_prios_keyfunc = itemgetter(1)
def indented(f, name):
def variant(self, *args, **kwargs):
@@ -270,7 +269,7 @@ class PNParser:
value, = part # unpack
return self.PRIMITIVES_TABLE[type(value)](self, value)
else:
- i, (prio, func) = max(enumerate(prios), key=_prios_keyfunc)
+ i, prio, func = max(((i, prio, func) for i, (prio, func) in enumerate(prios)), key=_prios_keyfunc)
# assert (prio, func) != NO_OPERATOR
return func(self, part[:i], part[i+1:], prios[:i], prios[i+1:])
Encapsulation patch (click to expand)
src/pdfng_cli/pnp.py | 35 +++++++++++++++++++++++++++--------
1 file changed, 27 insertions(+), 8 deletions(-)
diff --git a/src/pdfng_cli/pnp.py b/src/pdfng_cli/pnp.py
index ff6f943..cb0579f 100644
--- a/src/pdfng_cli/pnp.py
+++ b/src/pdfng_cli/pnp.py
@@ -15,9 +15,27 @@ from collections.abc import Iterable
logger = logging.getLogger(__name__)
-class Token (str):
+class Token:
+
+ __slots__ = ("char", )
+
+ def __init__(self, char):
+ self.char = char
+
+ def __eq__(self, other):
+ if isinstance(other, Token):
+ return self.char == other.char
+ else:
+ return self.char == other
+
+ def __hash__(self):
+ return hash(self.char)
+
+ def __str__(self):
+ return self.char
+
def __repr__(self):
- return "t" + super().__repr__()
+ return f"t{self.char!r}"
# -- Tokens --
# Tokens currently must be non-alphanumeric, single characters.
@@ -36,8 +54,8 @@ EVAL = Token("!")
# Let's hope that `re` internally builds a lookup table or similar.
# Note that EVAL is handled specially and thus not contained in OPERATORS_RE.
-def _compile_op_group(*operators):
- return re.compile("(" + "|".join(re.escape(op) for op in operators) + ")")
+def _compile_op_group(*tokens):
+ return re.compile("(" + "|".join(re.escape(t.char) for t in tokens) + ")")
SEP_RE = _compile_op_group(SEP)
BRACKETS_RE = _compile_op_group(OPEN, CLOSE)
@@ -60,7 +78,7 @@ class Group (list):
return f"G({super().__repr__()[1:-1]})"
def __str__(self):
- return f"{OPEN}{''.join(str(p) for p in self)}{CLOSE}"
+ return f"{OPEN.char}{''.join(str(p) for p in self)}{CLOSE.char}"
class OpPart (list):
def __repr__(self):
@@ -83,7 +101,7 @@ class BracketError (ValueError):
highlight = ""
for group in err_stack:
highlight += " "*(len(str(group)) - 2) + "^"
- super().__init__(f"Unmatched bracket(s):\n{''.join(parts)}\n{highlight}")
+ super().__init__(f"Unmatched bracket(s):\n{''.join(str(p) for p in parts)}\n{highlight}")
# -- Pre-processing functions (tokenization, grouping) --
@@ -227,8 +245,8 @@ class PNParser:
part = []
for i, p in enumerate(sep_portion):
if isinstance(p, str):
- if EVAL in p:
- a, b = p.split(EVAL, maxsplit=1)
+ if EVAL.char in p:
+ a, b = p.split(EVAL.char, maxsplit=1)
sep_remainder = sep_portion[i+1:]
if not b.strip() and sep_remainder and isinstance(sep_remainder[0], Group):
part.extend( [*_tokenize_part(a), EVAL] )
@@ -275,6 +293,7 @@ class PNParser:
value, = part # unpack
return self.PRIMITIVES_TABLE[type(value)](self, value)
else:
+ # print(part, prios)
i, (prio, func) = max(enumerate(prios), key=_prios_keyfunc)
# assert (prio, func) != NO_OPERATOR
return func(self, part[:i], part[i+1:], prios[:i], prios[i+1:])
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Possible regex-based token partitioning (click to expand)
The two RE-based approaches are similar in performance, but slightly slower than the current simple char loop.
On the other hand, RE-based partitioning would be a lot more powerful and flexible: This might allow for alphabetic split tokens (with lookahead check against variable names), multi-char tokens, etc. Not that we need it, though.
That said, actually using these more advanced semantics might come at some additional performance cost.
Update: now applied to the above code