Skip to content

Instantly share code, notes, and snippets.

@john-h-k
Last active March 19, 2024 12:27
Show Gist options
  • Select an option

  • Save john-h-k/da1b2692683075e8267ea09b330beaef to your computer and use it in GitHub Desktop.

Select an option

Save john-h-k/da1b2692683075e8267ea09b330beaef to your computer and use it in GitHub Desktop.
from __future__ import annotations
from typing import List, TypeVar, Generic, Callable, Dict, Any, TextIO, Iterable, Tuple
from functools import wraps, partial
from dataclasses import dataclass
from enum import Enum
from copy import deepcopy
import re
import operator
import importlib
import os
def static_local(**statics):
def _impl(func):
@wraps(func)
def __impl(*args, **kwargs):
marker = "__static_local_init_done"
if not hasattr(func, marker):
for key, val in statics.items():
setattr(func, key, val)
setattr(func, marker, None)
return func(*args, **kwargs)
return __impl
return _impl
def promote_ex(ex_from, ex_to, call, **ex_args):
try:
call()
except Exception as e:
if isinstance(e, ex_from):
raise ex_to(**ex_args) from e
raise e
class Colors:
BLACK = '\033[30m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
BLUE = '\033[34m'
MAGENTA = '\033[35m'
CYAN = '\033[36m'
WHITE = '\033[37m'
UNDERLINE = '\033[4m'
RESET = '\033[0m'
def splitmany(text : str, *seperators : str):
regular_exp = '|'.join(map(re.escape, seperators))
return re.split(regular_exp, text)
def splitone(collection : str, value : str) -> Tuple[str, str]:
if value not in collection:
return [collection]
index = collection.index(value)
return collection[:index], collection[index+1:]
TValue = TypeVar("TValue")
class IgnoreCaseDict(Dict[str, TValue]):
class _IgnoreCaseKey(str):
def __init__(self, value : str):
self.__value = value
def __hash__(self):
return hash(self.__value.lower())
def __eq__(self, other):
return self.__value.lower() == other.__value.lower() if isinstance(other, IgnoreCaseDict._IgnoreCaseKey) else self.__value.lower() == other.lower()
def __init__(self, initial={}):
for key, val in initial.items():
self[key] = val
def __getitem__(self, key):
return super().__getitem__(self._IgnoreCaseKey(key))
def __setitem__(self, key, value):
return super().__setitem__(self._IgnoreCaseKey(key), value)
def __contains__(self, key):
return super().__contains__(self._IgnoreCaseKey(key))
T = TypeVar("T")
class Stack(Generic[T]):
__capacity : int
__count : int
__data : List[T]
DefautCapacity : int = 64
def __init__(self, capacity=DefautCapacity):
self.__capacity = capacity
self.__count = 0
self.__data = []
def push(self, val) -> None:
if len(self.__data) == self.__count:
self.__data.append(None)
self.__data[self.__count] = val
self.__count += 1
def pop(self) -> T:
val = self.peek()
self.__count -= 1
return val
def peek(self) -> T:
if self.is_empty():
raise RuntimeError("Stack is empty")
return self.__data[self.__count - 1]
def clear(self) -> None:
self.__count = 0
def get_at(self, i : int) -> T:
return self.__data[i]
def count(self) -> T:
return self.__count
def is_empty(self) -> bool:
return self.__count == 0
def trim(self) -> None:
self.__data = self.__data[:self.__count]
class RpnStackEmptyError(Exception):
def __init__(self, stack_depth, required_depth):
self.stack_depth = stack_depth
self.required_depth = required_depth
message = f"Stack has depth {stack_depth} but operation required stack depth {required_depth}" if stack_depth > 0 else "Stack was empty"
self.message = message
super().__init__(message)
class RpnStackInvalidArgumentError(Exception):
def __init__(self, message):
self.message = message
super().__init__(message)
class RpnEvaluationStack:
PARAMETER_COUNT_MARKER = "parameter_count"
__stack : Stack[Any]
def __init__(self):
self.__stack = Stack()
def ensure_depth(self, depth):
if depth > self.__stack.count():
raise RpnStackEmptyError(self.__stack.count(), depth)
def push(self, value):
self.__stack.push(value)
def operator(self, callable : Callable[..., Any]):
from inspect import signature, Parameter
params = signature(callable).parameters
arg_count = getattr(callable, RpnEvaluationStack.PARAMETER_COUNT_MARKER) if len(params) == 1 and list(params.values())[0].kind == Parameter.VAR_POSITIONAL else len(params)
self.ensure_depth(arg_count)
result = callable(*reversed([self.__stack.pop() for _ in range(arg_count)]))
if result != None:
self.push(result)
def top(self) -> Any:
self.ensure_depth(1)
return self.__stack.peek()
def clear(self) -> None:
self.__stack.clear()
def pop(self) -> Any:
self.ensure_depth(1)
return self.__stack.pop()
def get_at(self, i : int) -> Any:
self.ensure_depth(i)
return self.__stack.get_at(i)
def stack_depth(self) -> int:
return self.__stack.count()
def select(left, right, if_true, if_false, comparison):
return if_true if comparison(left, right) else if_false
def range_select(left, right, if_greater, if_equal, if_less):
if left == right:
return if_equal
return if_greater if left > right else if_less
class RpnComputer:
__evaluator : RpnEvaluationStack
__operators : Dict[str, Callable[..., Any]]
__commands : Dict[str, Tuple[Callable[..., Any], str]]
__aliases : Dict[str, str]
__in : TextIO
__out : TextIO
__err : TextIO
__DefaultOperators : Dict[str, Callable[..., Any]] = IgnoreCaseDict({
"+" : operator.add,
"-" : operator.sub,
"*" : operator.mul,
"/" : operator.floordiv,
"^" : operator.pow,
"==" : partial(select, comparison=operator.eq),
"!=" : partial(select, comparison=operator.ne),
">" : partial(select, comparison=operator.gt),
">=" : partial(select, comparison=operator.ge),
"<=" : partial(select, comparison=operator.le),
"<=>" : range_select,
"pop" : lambda popped_value: None, # takes 1 value, but pushes non
})
__DefaultAliases : Dict[str, str] = IgnoreCaseDict({
"select_eq" : "==",
"select_neq" : "!=",
"select_gt" : ">",
"select_gte" : ">=",
"select_lt" : "<",
"select_lte" : "<=",
"select_range" : "<=>",
"add" : "+",
"sub" : "-",
"div" : "/",
"mul" : "*",
"pow" : "^",
})
__DefaultCommands : Dict[str, Tuple[Callable[..., Any], str]] = IgnoreCaseDict({
"CLEAR" : (lambda e, _: e.__evaluator.clear(), "'CLEAR' - Clear the stack"),
"PY_OP" : (lambda e, a: e.add_python_operator(a), "'PY_OP name [args]: expr' - Create a python operator"),
"RPN_OP" : (lambda e, a: e.add_rpn_operator(a), "'RPN_OP name [args]: expr' - Create an RPN operator"),
"ALIAS" : (lambda e, a: e.add_alias(a), "'ALIAS target name' - Create a new alias from 'name' to 'target'"),
"TOP" : (lambda e, _: e.show_result(e.__evaluator.top()), "'TOP' - Show the top of the stack"),
"AT" : (lambda e, a: e.get_at(a), "'AT point' - Show the value at 'point'"),
"DEPTH" : (lambda e, _: e.write_out(f"Depth: {e.__evaluator.stack_depth()}"), "'DEPTH' - Show the depth of the stack"),
"STACK" : (lambda e, a: e.show_stack(a), "'STACK [lo] [hi]' - Show the stack state, optionally from lower bound 'lo' to upper bound 'hi'. When ommitted, lo is 0 and hi is the max depth"),
"HELP" : (lambda e, a: e.show_help(a), "'HELP [-o/-od/-a/-ad]' - Show help for an option, or show help for the options")
})
from sys import stdin, stdout, stderr
def __init__(self, _in=stdin, _out=stdout, _err=stderr, write_in_to_out=False, cursor=False):
self.__operators = deepcopy(type(self).__DefaultOperators)
self.__commands = deepcopy(type(self).__DefaultCommands)
self.__aliases = deepcopy(type(self).__DefaultAliases)
self.__evaluator = RpnEvaluationStack()
self.__write_in_to_out = write_in_to_out
self.__in = _in
self.__out = _out
self.__out_color = Colors.CYAN
self.__err = _err
self.__err_color = Colors.RED
self.__debug_color = Colors.GREEN
self.__cursor = cursor
@dataclass
class _SignatureInfo:
name : str
arg_names : str
def __str__(self) -> str:
return f"{self.name} {' '.join(self.arg_names)}"
@staticmethod
def __parse_sig(args : str) -> Tuple[_SignatureInfo, str]:
from collections import namedtuple
split = args.split(":")
if len(split) != 2:
raise RpnStackInvalidArgumentError(f"Operator definition expected 1 colon delimiting signature from expression (but found {len(split) - 1}")
op, cmd = split
op = op.strip()
name, sig = splitone(op, " ")
arg_names = sig.split(" ")
return RpnComputer._SignatureInfo(name, arg_names), cmd
class _RpnOperator:
class OperationType(Enum):
Constant = 0
Operator = 1
ResolveVariable = 2
__rpnComputer : RpnComputer
__commands : Tuple[OperationType, Any]
__signature : _SignatureInfo
def __init__(self, computer : RpnComputer, sig : _SignatureInfo, cmds : str):
self.__rpnComputer = computer
self.__signature = sig
self.__commands = []
OperationType = RpnComputer._RpnOperator.OperationType
for cmd in filter(lambda x: x.strip() != "", splitmany(cmds, ",", " ")):
try:
self.__commands.append((OperationType.Constant, int(cmd)))
except ValueError:
if computer.is_valid_operator(cmd):
self.__commands.append((OperationType.Operator, cmd))
else:
if not cmd in sig.arg_names:
computer.write_err(f"Invalid argument '{cmd}' for func '{sig}'")
self.__commands.append((OperationType.ResolveVariable, sig.arg_names.index(cmd)))
def run(self, variables : List[int]) -> Any:
OperationType = RpnComputer._RpnOperator.OperationType
evaluator = RpnEvaluationStack()
for command in self.__commands:
if command[0] == OperationType.Constant:
evaluator.push(command[1])
elif command[0] == OperationType.Operator:
evaluator.operator(self.__rpnComputer.get_operator(command[1]))
elif command[0] == OperationType.ResolveVariable:
evaluator.push(variables[command[1]])
return evaluator.top()
def is_valid_operator(self, cmd : str) -> bool:
return cmd in self.__operators
def get_operator(self, cmd : str) -> Callable:
return self.__operators[cmd]
def get_at(self, args : str):
try:
i = int(args)
except ValueError:
raise RpnStackInvalidArgumentError("'AT' expected a single integer as an argument, but did not receive one")
if i < 0 or i > self.__evaluator.stack_depth() - 1:
raise RpnStackInvalidArgumentError(f"'AT' received an index of {i}, but this is outside the bounds of the stack (lo=0, hi={max(0, self.__evaluator.stack_depth() -1)})")
self.show_result(self.__evaluator.get_at(i))
def add_alias(self, args : str) -> None:
split = args.split(" ")
if len(split) != 2:
raise RpnStackInvalidArgumentError(f"Command 'ALIAS target name' expects 2 arguments but received {len(split)}")
target, name = split
if name == target:
self.write_err(f"Cannot alias '{name}' to itself")
return
if name in self.__aliases:
self.write_err(f"Cannot add alias '{name}' as it already exists (to target '{self.__aliases[name]})'")
return
self.__aliases[new] = target
def add_rpn_operator(self, args : str):
sig, cmd = type(self).__parse_sig(args)
rpnOperator = RpnComputer._RpnOperator(self, sig, cmd)
result = lambda *args: rpnOperator.run(list(args))
setattr(result, RpnEvaluationStack.PARAMETER_COUNT_MARKER, len(sig.arg_names))
self.__operators[sig.name] = result
def add_python_operator(self, args : str):
sig, cmd = type(self).__parse_sig(args)
try:
math = importlib.import_module("math")
res = eval(f"lambda {','.join(sig.arg_names)}: {cmd}", {"math":math})
except Exception as e:
self.write_err(f"Invalid python operator expression '{cmd}'")
self.write_err(f"[PY]> {str(e)}")
self.__operators[sig.name] = res
def write_debug(self, debug):
self.__err.write(self.__format("[RPN DEBUG]> ", self.__debug_color, debug))
def __format(self, prefix, color, message):
return color + prefix + f"\n{prefix}".join(filter(lambda x: x.strip() != "", message.split("\n"))) + Colors.RESET + "\n"
def write_err(self, err, close=False):
self.__err.write(self.__format("[RPN ERROR]> ", self.__err_color, err))
if close:
exit(-1)
def write_out(self, text):
self.__err.write(self.__format("[RPN]> ", self.__out_color, text))
def show_result(self, result):
self.write_out(f"Result: {result}")
def show_help(self, opts):
def _format_list(vals):
vals = list(vals)
return " * " + "\n * ".join(vals) if len(vals) > 0 else " [None]"
def _get_aliases(v):
aliases = [name for name, target in self.__aliases.items() if target == v]
return f" [Aliases: {', '.join(aliases)}]" if len(aliases) > 0 else ""
if opts == "" or opts == None:
s = (
"John's Reverse Polish Notation Engine: \n"
" * The engine is case insensitive\n"
" * You can have one command per line, or one or more operators/constants, space or comma seperated"
" * Type help -c to view all commands\n"
" * Type help -o to view all operators, or help -od to view only default operators\n"
" * Type help -a to view all aliases, or help -ad to view only default aliases\n"
" * Type help <command> to view help for a specific command"
" * Add a semicolon anywhere to start a comment\n"
)
elif opts == "-c":
s = "All engine commands: \n" + _format_list(map(lambda x: x[1], self.__commands.values()))
elif opts == "-o" or opts == "-od":
s = "All default engine operators: \n" + _format_list((x + _get_aliases(x) for x in self.__DefaultOperators.keys()))
if opts == "-o":
s += "\nAll user-defined engine operators: \n" + _format_list((x + _get_aliases(x) for x in (self.__operators.keys() - self.__DefaultOperators.keys())))
elif opts == "-a" or opts == "-ad":
s = "All default engine aliases: \n" + _format_list((f"{x} -> {self.__aliases[x]}" for x in self.__DefaultAliases.keys()))
if opts == "-a":
s += "\nAll user-defined engine aliases: \n" + _format_list((f"{x} -> {self.__aliases[x]}" for x in (self.__aliases.keys() - self.__DefaultAliases.keys())))
elif opts in self.__commands:
s = f" * {self.__commands[opts][1]}\n"
else:
self.write_err(f"Unrecognised option '{opts}' for HELP command")
return
self.write_debug(s)
def show_stack(self, maybe_range):
maybe_range = "" if maybe_range == None else maybe_range
s = maybe_range.split(" ")
lo = 0
hi = self.__evaluator.stack_depth()
if len(s) >= 1 and len(s[0]) > 0:
lo = int(s[0])
if len(s) == 2:
hi = int(s[1])
if lo < 0 or lo > hi:
self.write_debug(f"Invalid stack lo point ({lo})")
if hi < 0 or hi > self.__evaluator.stack_depth():
self.write_debug(f"Invalid stack hi point ({hi})")
self.write_debug("START OF STACK")
for i in range(lo, hi):
self.write_debug(f"{i}: {self.__evaluator.get_at(i)}")
self.write_debug("END OF STACK")
def get_input(self):
for inp in self.__in:
if self.__write_in_to_out:
self.__out.write(inp + "\n")
yield inp
def run(self):
def _next_line():
if self.__cursor:
self.__out.write("> ")
self.__out.flush()
_next_line()
for line in map(lambda x: x[:-1] if len(x) > 0 and x[-1] == "\n" else x, self.get_input()):
line = line.split(";")[0].strip()
try:
if " " in line:
cmd, args = splitone(line, " ")
else:
cmd, args = line, None
if cmd == "":
_next_line()
continue
while cmd in self.__aliases:
cmd = self.__aliases[cmd]
if cmd.lower() == "quit":
self.show_result(self.__evaluator.top())
return
if cmd in self.__commands:
self.__commands[cmd][0](self, args)
_next_line()
continue
for subcmd in splitmany(line, " ", ","):
if subcmd in self.__operators:
top = self.__evaluator.top()
self.__evaluator.operator(self.__operators[subcmd])
if self.__evaluator.stack_depth() == 0:
self.show_result(top)
else:
try:
self.__evaluator.push(float(subcmd))
except ValueError:
self.write_err(f"'{subcmd}' was not recognised as an alias, command, operator, or valid constant")
_next_line()
except (RpnStackEmptyError, RpnStackInvalidArgumentError) as e:
self.write_err(e.message)
_next_line()
cmds = """RPN_OP square a: a,a,*
PY_OP sqrt a: math.sqrt(a)
5
5
square
sqrt
"""
example_program = """
RPN_OP discrim a b c: b 2 ^ a c * 4 * -
RPN_OP numroots a b c: a b c discrim 0 2 1 0 <=>
; x^2
1 ; a
0 ; b
0 ; c
numroots
pop ; expected: 1
; x^2 - x + 9
1 ; a
-1 ; b
9 ; c
numroots
pop ; expected: 0
; x^2 + 10x + 5
1 ; a
10 ; b
5 ; c
numroots
pop ; expected: 2
"""
from sys import stdin
os.system("") # required for colors to work on windows
source, redir = stdin, False
#source, redir = example_program.split("\n"), True
computer : RpnComputer = RpnComputer(_in=source, write_in_to_out=redir, cursor=True)
computer.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment