Last active
March 1, 2022 03:11
-
-
Save weixinfree/a67bf505fd70a267254dea0c782397c7 to your computer and use it in GitHub Desktop.
parser combinator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from abc import abstractmethod | |
| import re | |
| from typing import Callable, Any, List, Sequence, Union, Optional | |
| import logging | |
| class State: | |
| def __init__(self, src: str): | |
| self.src = src | |
| self._index = 0 | |
| self._size = len(src) | |
| self._mark = [] | |
| def diagnose(self): | |
| return f'DIAGNOSE-INFO: State({self.index}-{self._size}), "{self.peek(3)}"' | |
| @property | |
| def index(self): | |
| return self._index | |
| def eof(self): | |
| return self._index >= self._size | |
| def peek(self, step: int = 1): | |
| return self.src[self._index : self._index + step] | |
| def advance(self, step: int = 1): | |
| self._index += step | |
| logging.debug(f"state advace: {self._index}") | |
| def begin(self) -> int: | |
| self._mark.append(self.index) | |
| return len(self._mark) | |
| def commit(self, level: int): | |
| assert level == len(self._mark) | |
| self._mark.pop() | |
| def rollback(self, level: int): | |
| assert level == len(self._mark) | |
| self._index = self._mark.pop() | |
| logging.debug(f"state rollback, {self._index}") | |
| def __repr__(self): | |
| return f"State(index={self._index}, size={self._size})" | |
| class ParseException(BaseException): | |
| def __init__(self, msg: str, st: State): | |
| super().__init__(msg + f" :context '{st.peek()}', failed at {st}") | |
| Processor = Callable[["State"], None] | |
| class Parser: | |
| leading_processor: Processor = None # default skip space | |
| tailing_processor: Processor = None # default skip space | |
| def __init__(self, func: Callable[["State"], Any]): | |
| self._pfunc = func | |
| self._name = "Parser" | |
| @property | |
| def name(self): | |
| return self._name | |
| @name.setter | |
| def name(self, name: str): | |
| self._name = name | |
| def named(self, name: str): | |
| self.name = name | |
| return self | |
| def __or__(self, other: "Parser"): | |
| @Parser | |
| def _or(st: State): | |
| mark = st.begin() | |
| try: | |
| r = self.parse(st) | |
| st.commit(mark) | |
| return r | |
| except ParseException: | |
| st.rollback(mark) | |
| return other.parse(st) | |
| _or.name = f"{self.name} | {other.name}" | |
| return _or | |
| def __rshift__(self, other: Callable): | |
| @Parser | |
| def _shift(st: State): | |
| return other(self.parse(st)) | |
| return _shift.named(self.name) | |
| def __add__(self, other: "Parser"): | |
| @Parser | |
| def _add(st: State): | |
| left = self.parse(st) | |
| right = other.parse(st) | |
| if left == right == _IgnoredValue: | |
| return None | |
| if left == _IgnoredValue: | |
| return right | |
| if right == _IgnoredValue: | |
| return left | |
| if isinstance(left, _Tuple): | |
| return _Tuple(left, (right,)) | |
| return _Tuple((left, right)) | |
| return _add.named(f"{self._name} + {other._name}") | |
| def __invert__(self): | |
| @Parser | |
| def _skip(st: State) -> "Parser": | |
| self.parse(st) | |
| return _IgnoredValue | |
| return _skip.named(f"~{self._name}") | |
| def __call__(self, st): | |
| return self.parse(st) | |
| def bind(self, mapper): | |
| @Parser | |
| def _bind(st: State): | |
| return mapper(self.parse(st)) | |
| return _bind.named(self._name) | |
| def parse(self, st): | |
| _log_start(f"{self.name}") | |
| state = None | |
| if isinstance(st, State): | |
| state = st | |
| elif isinstance(st, str): | |
| state = State(st) | |
| else: | |
| raise Exception(f"unsupported type: {type(st)}") | |
| if Parser.leading_processor: | |
| Parser.leading_processor(state) | |
| try: | |
| r = self._pfunc(state) | |
| except ParseException: | |
| _log_end(f"{self.name}, failed at: {state.diagnose()}") | |
| raise | |
| if Parser.tailing_processor: | |
| Parser.tailing_processor(state) | |
| _log_end(f"{self.name}, result: {r}") | |
| return r | |
| @Parser | |
| def finished(st: State): | |
| if st.eof(): | |
| return ("Finished",) | |
| else: | |
| raise ParseException("except finished", st) | |
| finished.name = "finished" | |
| def a(char: str) -> Parser: | |
| @Parser | |
| def _a(st: State) -> Any: | |
| step = len(char) | |
| if st.peek(step) == char: | |
| st.advance(step) | |
| return char | |
| else: | |
| raise ParseException(f"except {char}", st) | |
| _a.name = f'a("{char}")' | |
| return _a | |
| def regex(pat: Union[str, re.Pattern]) -> Parser: | |
| pattern: re.Pattern = pat if isinstance(pat, re.Pattern) else re.compile(pat) | |
| @Parser | |
| def _regex(st: State): | |
| m = pattern.match(st.src, st.index) | |
| if not m: | |
| raise ParseException(f"except regex({pat})", st) | |
| begin, end = m.span() | |
| st.advance(end - begin) | |
| return st.src[begin:end] | |
| return _regex.named(f"regex({pat})") | |
| digit: Parser = regex(r"\d+").bind(lambda x: int(x)).named("digit") | |
| java_int: Parser = regex(r"-?\d+").bind(lambda x: int(x)).named("java-int") | |
| @Parser | |
| def space(st: State) -> Parser: | |
| return ("space", _skip_space(st)) | |
| space.name = "Space" | |
| _float_pat = re.compile( | |
| r""" | |
| -? # Minus | |
| (0|([1-9][0-9]*)) # Int | |
| (\.[0-9]+)? # Frac | |
| ([Ee][+-]?[0-9]+)? # Exp | |
| """, | |
| re.VERBOSE, | |
| ) | |
| java_float = regex(_float_pat).named("java_float").bind(lambda x: float(x)) | |
| _regexps = { | |
| "escaped": r""" | |
| \\ # Escape | |
| ((?P<standard>["\\/bfnrt]) # Standard escapes | |
| | (u(?P<unicode>[0-9A-Fa-f]{4}))) # uXXXX | |
| """, | |
| "unescaped": r""" | |
| [^"\\] # Unescaped: avoid ["\\] | |
| """, | |
| } | |
| _string_pat = re.compile(r'"(%(unescaped)s | %(escaped)s)*"' % _regexps, re.VERBOSE) | |
| def _remove_string_mark(s): | |
| return s[1:-1] | |
| string = regex(_string_pat).bind(_remove_string_mark).named("string") | |
| def pure(value: Any): | |
| @Parser | |
| def _pure(st: State): | |
| return value | |
| return _pure.named(f"pure({value})") | |
| def maybe(p: Parser) -> Parser: | |
| return (p | pure(None)).named(f"maybe({p.name})") | |
| def oneplus(p: Parser) -> Parser: | |
| @Parser | |
| def _oneplus(st: State): | |
| li = [p.parse(st)] | |
| while True: | |
| mark = st.begin() | |
| try: | |
| li.append(p.parse(st)) | |
| st.commit(mark) | |
| except ParseException: | |
| st.rollback(mark) | |
| break | |
| return li | |
| return _oneplus.named(f"oneplus({p.name})") | |
| def many(p: Parser) -> Parser: | |
| @Parser | |
| def _many(st: State): | |
| li = [] | |
| while True: | |
| mark = st.begin() | |
| try: | |
| li.append(p.parse(st)) | |
| st.commit(mark) | |
| except ParseException: | |
| st.rollback(mark) | |
| break | |
| return li | |
| return _many.named(f"many({p.name})") | |
| def sepby(body: Parser, sep: Parser) -> Parser: | |
| @Parser | |
| def _sepby(st: State): | |
| li = [body.parse(st)] | |
| while True: | |
| mark = st.begin() | |
| try: | |
| sep.parse(st) | |
| st.commit(mark) | |
| except ParseException: | |
| st.rollback(mark) | |
| break | |
| mark = st.begin() | |
| try: | |
| li.append(body.parse(st)) | |
| st.commit(mark) | |
| except ParseException: | |
| st.rollback(mark) | |
| return li | |
| return _sepby.named(f"sepby({body.name}, {sep.name})") | |
| class ForwordDecl(Parser): | |
| def define(self, p: Parser): | |
| self.real_parser = p | |
| def parse(self, st): | |
| parser = getattr(self, "real_parser", None) | |
| if not parser: | |
| raise NotImplementedError("call define() first") | |
| return parser.parse(st) | |
| def forward_decl() -> Parser: | |
| def empty(st: State): | |
| assert False | |
| return ForwordDecl(empty).named("forward-decl") | |
| def const(v: Any) -> Callable: | |
| def func(input: Any): | |
| return v | |
| return func | |
| ########## remove leading/tailing useless chars | |
| def _skip_space(st: "State"): | |
| count = 0 | |
| while not st.eof() and st.peek().isspace(): | |
| st.advance() | |
| count += 1 | |
| return count | |
| SkipSpaceProcessor = _skip_space | |
| Parser.leading_processor = SkipSpaceProcessor | |
| Parser.tailing_processor = SkipSpaceProcessor | |
| ########### special value | |
| class _Ignored: | |
| def __repr__(self): | |
| return "ignored" | |
| _IgnoredValue = _Ignored() | |
| class _Tuple(tuple): | |
| pass | |
| ############ log | |
| _log_level = 1 | |
| def _log_start(msg: str): | |
| global _log_level | |
| indent = "-" * _log_level + f"[{_log_level}]>>> " | |
| logging.debug(indent + msg) | |
| _log_level += 1 | |
| def _log_end(msg: str): | |
| global _log_level | |
| _log_level -= 1 | |
| indent = "-" * _log_level + f"[{_log_level}]<<< " | |
| logging.debug(indent + msg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment