Skip to content

Instantly share code, notes, and snippets.

@weixinfree
Last active March 1, 2022 03:11
Show Gist options
  • Select an option

  • Save weixinfree/a67bf505fd70a267254dea0c782397c7 to your computer and use it in GitHub Desktop.

Select an option

Save weixinfree/a67bf505fd70a267254dea0c782397c7 to your computer and use it in GitHub Desktop.
parser combinator
from abc import abstractmethod
import re
from typing import Callable, Any, List, Sequence, Union, Optional
import logging
class State:
def __init__(self, src: str):
self.src = src
self._index = 0
self._size = len(src)
self._mark = []
def diagnose(self):
return f'DIAGNOSE-INFO: State({self.index}-{self._size}), "{self.peek(3)}"'
@property
def index(self):
return self._index
def eof(self):
return self._index >= self._size
def peek(self, step: int = 1):
return self.src[self._index : self._index + step]
def advance(self, step: int = 1):
self._index += step
logging.debug(f"state advace: {self._index}")
def begin(self) -> int:
self._mark.append(self.index)
return len(self._mark)
def commit(self, level: int):
assert level == len(self._mark)
self._mark.pop()
def rollback(self, level: int):
assert level == len(self._mark)
self._index = self._mark.pop()
logging.debug(f"state rollback, {self._index}")
def __repr__(self):
return f"State(index={self._index}, size={self._size})"
class ParseException(BaseException):
def __init__(self, msg: str, st: State):
super().__init__(msg + f" :context '{st.peek()}', failed at {st}")
Processor = Callable[["State"], None]
class Parser:
leading_processor: Processor = None # default skip space
tailing_processor: Processor = None # default skip space
def __init__(self, func: Callable[["State"], Any]):
self._pfunc = func
self._name = "Parser"
@property
def name(self):
return self._name
@name.setter
def name(self, name: str):
self._name = name
def named(self, name: str):
self.name = name
return self
def __or__(self, other: "Parser"):
@Parser
def _or(st: State):
mark = st.begin()
try:
r = self.parse(st)
st.commit(mark)
return r
except ParseException:
st.rollback(mark)
return other.parse(st)
_or.name = f"{self.name} | {other.name}"
return _or
def __rshift__(self, other: Callable):
@Parser
def _shift(st: State):
return other(self.parse(st))
return _shift.named(self.name)
def __add__(self, other: "Parser"):
@Parser
def _add(st: State):
left = self.parse(st)
right = other.parse(st)
if left == right == _IgnoredValue:
return None
if left == _IgnoredValue:
return right
if right == _IgnoredValue:
return left
if isinstance(left, _Tuple):
return _Tuple(left, (right,))
return _Tuple((left, right))
return _add.named(f"{self._name} + {other._name}")
def __invert__(self):
@Parser
def _skip(st: State) -> "Parser":
self.parse(st)
return _IgnoredValue
return _skip.named(f"~{self._name}")
def __call__(self, st):
return self.parse(st)
def bind(self, mapper):
@Parser
def _bind(st: State):
return mapper(self.parse(st))
return _bind.named(self._name)
def parse(self, st):
_log_start(f"{self.name}")
state = None
if isinstance(st, State):
state = st
elif isinstance(st, str):
state = State(st)
else:
raise Exception(f"unsupported type: {type(st)}")
if Parser.leading_processor:
Parser.leading_processor(state)
try:
r = self._pfunc(state)
except ParseException:
_log_end(f"{self.name}, failed at: {state.diagnose()}")
raise
if Parser.tailing_processor:
Parser.tailing_processor(state)
_log_end(f"{self.name}, result: {r}")
return r
@Parser
def finished(st: State):
if st.eof():
return ("Finished",)
else:
raise ParseException("except finished", st)
finished.name = "finished"
def a(char: str) -> Parser:
@Parser
def _a(st: State) -> Any:
step = len(char)
if st.peek(step) == char:
st.advance(step)
return char
else:
raise ParseException(f"except {char}", st)
_a.name = f'a("{char}")'
return _a
def regex(pat: Union[str, re.Pattern]) -> Parser:
pattern: re.Pattern = pat if isinstance(pat, re.Pattern) else re.compile(pat)
@Parser
def _regex(st: State):
m = pattern.match(st.src, st.index)
if not m:
raise ParseException(f"except regex({pat})", st)
begin, end = m.span()
st.advance(end - begin)
return st.src[begin:end]
return _regex.named(f"regex({pat})")
digit: Parser = regex(r"\d+").bind(lambda x: int(x)).named("digit")
java_int: Parser = regex(r"-?\d+").bind(lambda x: int(x)).named("java-int")
@Parser
def space(st: State) -> Parser:
return ("space", _skip_space(st))
space.name = "Space"
_float_pat = re.compile(
r"""
-? # Minus
(0|([1-9][0-9]*)) # Int
(\.[0-9]+)? # Frac
([Ee][+-]?[0-9]+)? # Exp
""",
re.VERBOSE,
)
java_float = regex(_float_pat).named("java_float").bind(lambda x: float(x))
_regexps = {
"escaped": r"""
\\ # Escape
((?P<standard>["\\/bfnrt]) # Standard escapes
| (u(?P<unicode>[0-9A-Fa-f]{4}))) # uXXXX
""",
"unescaped": r"""
[^"\\] # Unescaped: avoid ["\\]
""",
}
_string_pat = re.compile(r'"(%(unescaped)s | %(escaped)s)*"' % _regexps, re.VERBOSE)
def _remove_string_mark(s):
return s[1:-1]
string = regex(_string_pat).bind(_remove_string_mark).named("string")
def pure(value: Any):
@Parser
def _pure(st: State):
return value
return _pure.named(f"pure({value})")
def maybe(p: Parser) -> Parser:
return (p | pure(None)).named(f"maybe({p.name})")
def oneplus(p: Parser) -> Parser:
@Parser
def _oneplus(st: State):
li = [p.parse(st)]
while True:
mark = st.begin()
try:
li.append(p.parse(st))
st.commit(mark)
except ParseException:
st.rollback(mark)
break
return li
return _oneplus.named(f"oneplus({p.name})")
def many(p: Parser) -> Parser:
@Parser
def _many(st: State):
li = []
while True:
mark = st.begin()
try:
li.append(p.parse(st))
st.commit(mark)
except ParseException:
st.rollback(mark)
break
return li
return _many.named(f"many({p.name})")
def sepby(body: Parser, sep: Parser) -> Parser:
@Parser
def _sepby(st: State):
li = [body.parse(st)]
while True:
mark = st.begin()
try:
sep.parse(st)
st.commit(mark)
except ParseException:
st.rollback(mark)
break
mark = st.begin()
try:
li.append(body.parse(st))
st.commit(mark)
except ParseException:
st.rollback(mark)
return li
return _sepby.named(f"sepby({body.name}, {sep.name})")
class ForwordDecl(Parser):
def define(self, p: Parser):
self.real_parser = p
def parse(self, st):
parser = getattr(self, "real_parser", None)
if not parser:
raise NotImplementedError("call define() first")
return parser.parse(st)
def forward_decl() -> Parser:
def empty(st: State):
assert False
return ForwordDecl(empty).named("forward-decl")
def const(v: Any) -> Callable:
def func(input: Any):
return v
return func
########## remove leading/tailing useless chars
def _skip_space(st: "State"):
count = 0
while not st.eof() and st.peek().isspace():
st.advance()
count += 1
return count
SkipSpaceProcessor = _skip_space
Parser.leading_processor = SkipSpaceProcessor
Parser.tailing_processor = SkipSpaceProcessor
########### special value
class _Ignored:
def __repr__(self):
return "ignored"
_IgnoredValue = _Ignored()
class _Tuple(tuple):
pass
############ log
_log_level = 1
def _log_start(msg: str):
global _log_level
indent = "-" * _log_level + f"[{_log_level}]>>> "
logging.debug(indent + msg)
_log_level += 1
def _log_end(msg: str):
global _log_level
_log_level -= 1
indent = "-" * _log_level + f"[{_log_level}]<<< "
logging.debug(indent + msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment