Last active
March 19, 2024 12:27
-
-
Save john-h-k/da1b2692683075e8267ea09b330beaef to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| from typing import List, TypeVar, Generic, Callable, Dict, Any, TextIO, Iterable, Tuple | |
| from functools import wraps, partial | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from copy import deepcopy | |
| import re | |
| import operator | |
| import importlib | |
| import os | |
| def static_local(**statics): | |
| def _impl(func): | |
| @wraps(func) | |
| def __impl(*args, **kwargs): | |
| marker = "__static_local_init_done" | |
| if not hasattr(func, marker): | |
| for key, val in statics.items(): | |
| setattr(func, key, val) | |
| setattr(func, marker, None) | |
| return func(*args, **kwargs) | |
| return __impl | |
| return _impl | |
| def promote_ex(ex_from, ex_to, call, **ex_args): | |
| try: | |
| call() | |
| except Exception as e: | |
| if isinstance(e, ex_from): | |
| raise ex_to(**ex_args) from e | |
| raise e | |
| class Colors: | |
| BLACK = '\033[30m' | |
| RED = '\033[31m' | |
| GREEN = '\033[32m' | |
| YELLOW = '\033[33m' | |
| BLUE = '\033[34m' | |
| MAGENTA = '\033[35m' | |
| CYAN = '\033[36m' | |
| WHITE = '\033[37m' | |
| UNDERLINE = '\033[4m' | |
| RESET = '\033[0m' | |
| def splitmany(text : str, *seperators : str): | |
| regular_exp = '|'.join(map(re.escape, seperators)) | |
| return re.split(regular_exp, text) | |
| def splitone(collection : str, value : str) -> Tuple[str, str]: | |
| if value not in collection: | |
| return [collection] | |
| index = collection.index(value) | |
| return collection[:index], collection[index+1:] | |
| TValue = TypeVar("TValue") | |
| class IgnoreCaseDict(Dict[str, TValue]): | |
| class _IgnoreCaseKey(str): | |
| def __init__(self, value : str): | |
| self.__value = value | |
| def __hash__(self): | |
| return hash(self.__value.lower()) | |
| def __eq__(self, other): | |
| return self.__value.lower() == other.__value.lower() if isinstance(other, IgnoreCaseDict._IgnoreCaseKey) else self.__value.lower() == other.lower() | |
| def __init__(self, initial={}): | |
| for key, val in initial.items(): | |
| self[key] = val | |
| def __getitem__(self, key): | |
| return super().__getitem__(self._IgnoreCaseKey(key)) | |
| def __setitem__(self, key, value): | |
| return super().__setitem__(self._IgnoreCaseKey(key), value) | |
| def __contains__(self, key): | |
| return super().__contains__(self._IgnoreCaseKey(key)) | |
| T = TypeVar("T") | |
| class Stack(Generic[T]): | |
| __capacity : int | |
| __count : int | |
| __data : List[T] | |
| DefautCapacity : int = 64 | |
| def __init__(self, capacity=DefautCapacity): | |
| self.__capacity = capacity | |
| self.__count = 0 | |
| self.__data = [] | |
| def push(self, val) -> None: | |
| if len(self.__data) == self.__count: | |
| self.__data.append(None) | |
| self.__data[self.__count] = val | |
| self.__count += 1 | |
| def pop(self) -> T: | |
| val = self.peek() | |
| self.__count -= 1 | |
| return val | |
| def peek(self) -> T: | |
| if self.is_empty(): | |
| raise RuntimeError("Stack is empty") | |
| return self.__data[self.__count - 1] | |
| def clear(self) -> None: | |
| self.__count = 0 | |
| def get_at(self, i : int) -> T: | |
| return self.__data[i] | |
| def count(self) -> T: | |
| return self.__count | |
| def is_empty(self) -> bool: | |
| return self.__count == 0 | |
| def trim(self) -> None: | |
| self.__data = self.__data[:self.__count] | |
| class RpnStackEmptyError(Exception): | |
| def __init__(self, stack_depth, required_depth): | |
| self.stack_depth = stack_depth | |
| self.required_depth = required_depth | |
| message = f"Stack has depth {stack_depth} but operation required stack depth {required_depth}" if stack_depth > 0 else "Stack was empty" | |
| self.message = message | |
| super().__init__(message) | |
| class RpnStackInvalidArgumentError(Exception): | |
| def __init__(self, message): | |
| self.message = message | |
| super().__init__(message) | |
| class RpnEvaluationStack: | |
| PARAMETER_COUNT_MARKER = "parameter_count" | |
| __stack : Stack[Any] | |
| def __init__(self): | |
| self.__stack = Stack() | |
| def ensure_depth(self, depth): | |
| if depth > self.__stack.count(): | |
| raise RpnStackEmptyError(self.__stack.count(), depth) | |
| def push(self, value): | |
| self.__stack.push(value) | |
| def operator(self, callable : Callable[..., Any]): | |
| from inspect import signature, Parameter | |
| params = signature(callable).parameters | |
| arg_count = getattr(callable, RpnEvaluationStack.PARAMETER_COUNT_MARKER) if len(params) == 1 and list(params.values())[0].kind == Parameter.VAR_POSITIONAL else len(params) | |
| self.ensure_depth(arg_count) | |
| result = callable(*reversed([self.__stack.pop() for _ in range(arg_count)])) | |
| if result != None: | |
| self.push(result) | |
| def top(self) -> Any: | |
| self.ensure_depth(1) | |
| return self.__stack.peek() | |
| def clear(self) -> None: | |
| self.__stack.clear() | |
| def pop(self) -> Any: | |
| self.ensure_depth(1) | |
| return self.__stack.pop() | |
| def get_at(self, i : int) -> Any: | |
| self.ensure_depth(i) | |
| return self.__stack.get_at(i) | |
| def stack_depth(self) -> int: | |
| return self.__stack.count() | |
| def select(left, right, if_true, if_false, comparison): | |
| return if_true if comparison(left, right) else if_false | |
| def range_select(left, right, if_greater, if_equal, if_less): | |
| if left == right: | |
| return if_equal | |
| return if_greater if left > right else if_less | |
| class RpnComputer: | |
| __evaluator : RpnEvaluationStack | |
| __operators : Dict[str, Callable[..., Any]] | |
| __commands : Dict[str, Tuple[Callable[..., Any], str]] | |
| __aliases : Dict[str, str] | |
| __in : TextIO | |
| __out : TextIO | |
| __err : TextIO | |
| __DefaultOperators : Dict[str, Callable[..., Any]] = IgnoreCaseDict({ | |
| "+" : operator.add, | |
| "-" : operator.sub, | |
| "*" : operator.mul, | |
| "/" : operator.floordiv, | |
| "^" : operator.pow, | |
| "==" : partial(select, comparison=operator.eq), | |
| "!=" : partial(select, comparison=operator.ne), | |
| ">" : partial(select, comparison=operator.gt), | |
| ">=" : partial(select, comparison=operator.ge), | |
| "<=" : partial(select, comparison=operator.le), | |
| "<=>" : range_select, | |
| "pop" : lambda popped_value: None, # takes 1 value, but pushes non | |
| }) | |
| __DefaultAliases : Dict[str, str] = IgnoreCaseDict({ | |
| "select_eq" : "==", | |
| "select_neq" : "!=", | |
| "select_gt" : ">", | |
| "select_gte" : ">=", | |
| "select_lt" : "<", | |
| "select_lte" : "<=", | |
| "select_range" : "<=>", | |
| "add" : "+", | |
| "sub" : "-", | |
| "div" : "/", | |
| "mul" : "*", | |
| "pow" : "^", | |
| }) | |
| __DefaultCommands : Dict[str, Tuple[Callable[..., Any], str]] = IgnoreCaseDict({ | |
| "CLEAR" : (lambda e, _: e.__evaluator.clear(), "'CLEAR' - Clear the stack"), | |
| "PY_OP" : (lambda e, a: e.add_python_operator(a), "'PY_OP name [args]: expr' - Create a python operator"), | |
| "RPN_OP" : (lambda e, a: e.add_rpn_operator(a), "'RPN_OP name [args]: expr' - Create an RPN operator"), | |
| "ALIAS" : (lambda e, a: e.add_alias(a), "'ALIAS target name' - Create a new alias from 'name' to 'target'"), | |
| "TOP" : (lambda e, _: e.show_result(e.__evaluator.top()), "'TOP' - Show the top of the stack"), | |
| "AT" : (lambda e, a: e.get_at(a), "'AT point' - Show the value at 'point'"), | |
| "DEPTH" : (lambda e, _: e.write_out(f"Depth: {e.__evaluator.stack_depth()}"), "'DEPTH' - Show the depth of the stack"), | |
| "STACK" : (lambda e, a: e.show_stack(a), "'STACK [lo] [hi]' - Show the stack state, optionally from lower bound 'lo' to upper bound 'hi'. When ommitted, lo is 0 and hi is the max depth"), | |
| "HELP" : (lambda e, a: e.show_help(a), "'HELP [-o/-od/-a/-ad]' - Show help for an option, or show help for the options") | |
| }) | |
| from sys import stdin, stdout, stderr | |
| def __init__(self, _in=stdin, _out=stdout, _err=stderr, write_in_to_out=False, cursor=False): | |
| self.__operators = deepcopy(type(self).__DefaultOperators) | |
| self.__commands = deepcopy(type(self).__DefaultCommands) | |
| self.__aliases = deepcopy(type(self).__DefaultAliases) | |
| self.__evaluator = RpnEvaluationStack() | |
| self.__write_in_to_out = write_in_to_out | |
| self.__in = _in | |
| self.__out = _out | |
| self.__out_color = Colors.CYAN | |
| self.__err = _err | |
| self.__err_color = Colors.RED | |
| self.__debug_color = Colors.GREEN | |
| self.__cursor = cursor | |
| @dataclass | |
| class _SignatureInfo: | |
| name : str | |
| arg_names : str | |
| def __str__(self) -> str: | |
| return f"{self.name} {' '.join(self.arg_names)}" | |
| @staticmethod | |
| def __parse_sig(args : str) -> Tuple[_SignatureInfo, str]: | |
| from collections import namedtuple | |
| split = args.split(":") | |
| if len(split) != 2: | |
| raise RpnStackInvalidArgumentError(f"Operator definition expected 1 colon delimiting signature from expression (but found {len(split) - 1}") | |
| op, cmd = split | |
| op = op.strip() | |
| name, sig = splitone(op, " ") | |
| arg_names = sig.split(" ") | |
| return RpnComputer._SignatureInfo(name, arg_names), cmd | |
| class _RpnOperator: | |
| class OperationType(Enum): | |
| Constant = 0 | |
| Operator = 1 | |
| ResolveVariable = 2 | |
| __rpnComputer : RpnComputer | |
| __commands : Tuple[OperationType, Any] | |
| __signature : _SignatureInfo | |
| def __init__(self, computer : RpnComputer, sig : _SignatureInfo, cmds : str): | |
| self.__rpnComputer = computer | |
| self.__signature = sig | |
| self.__commands = [] | |
| OperationType = RpnComputer._RpnOperator.OperationType | |
| for cmd in filter(lambda x: x.strip() != "", splitmany(cmds, ",", " ")): | |
| try: | |
| self.__commands.append((OperationType.Constant, int(cmd))) | |
| except ValueError: | |
| if computer.is_valid_operator(cmd): | |
| self.__commands.append((OperationType.Operator, cmd)) | |
| else: | |
| if not cmd in sig.arg_names: | |
| computer.write_err(f"Invalid argument '{cmd}' for func '{sig}'") | |
| self.__commands.append((OperationType.ResolveVariable, sig.arg_names.index(cmd))) | |
| def run(self, variables : List[int]) -> Any: | |
| OperationType = RpnComputer._RpnOperator.OperationType | |
| evaluator = RpnEvaluationStack() | |
| for command in self.__commands: | |
| if command[0] == OperationType.Constant: | |
| evaluator.push(command[1]) | |
| elif command[0] == OperationType.Operator: | |
| evaluator.operator(self.__rpnComputer.get_operator(command[1])) | |
| elif command[0] == OperationType.ResolveVariable: | |
| evaluator.push(variables[command[1]]) | |
| return evaluator.top() | |
| def is_valid_operator(self, cmd : str) -> bool: | |
| return cmd in self.__operators | |
| def get_operator(self, cmd : str) -> Callable: | |
| return self.__operators[cmd] | |
| def get_at(self, args : str): | |
| try: | |
| i = int(args) | |
| except ValueError: | |
| raise RpnStackInvalidArgumentError("'AT' expected a single integer as an argument, but did not receive one") | |
| if i < 0 or i > self.__evaluator.stack_depth() - 1: | |
| raise RpnStackInvalidArgumentError(f"'AT' received an index of {i}, but this is outside the bounds of the stack (lo=0, hi={max(0, self.__evaluator.stack_depth() -1)})") | |
| self.show_result(self.__evaluator.get_at(i)) | |
| def add_alias(self, args : str) -> None: | |
| split = args.split(" ") | |
| if len(split) != 2: | |
| raise RpnStackInvalidArgumentError(f"Command 'ALIAS target name' expects 2 arguments but received {len(split)}") | |
| target, name = split | |
| if name == target: | |
| self.write_err(f"Cannot alias '{name}' to itself") | |
| return | |
| if name in self.__aliases: | |
| self.write_err(f"Cannot add alias '{name}' as it already exists (to target '{self.__aliases[name]})'") | |
| return | |
| self.__aliases[new] = target | |
| def add_rpn_operator(self, args : str): | |
| sig, cmd = type(self).__parse_sig(args) | |
| rpnOperator = RpnComputer._RpnOperator(self, sig, cmd) | |
| result = lambda *args: rpnOperator.run(list(args)) | |
| setattr(result, RpnEvaluationStack.PARAMETER_COUNT_MARKER, len(sig.arg_names)) | |
| self.__operators[sig.name] = result | |
| def add_python_operator(self, args : str): | |
| sig, cmd = type(self).__parse_sig(args) | |
| try: | |
| math = importlib.import_module("math") | |
| res = eval(f"lambda {','.join(sig.arg_names)}: {cmd}", {"math":math}) | |
| except Exception as e: | |
| self.write_err(f"Invalid python operator expression '{cmd}'") | |
| self.write_err(f"[PY]> {str(e)}") | |
| self.__operators[sig.name] = res | |
| def write_debug(self, debug): | |
| self.__err.write(self.__format("[RPN DEBUG]> ", self.__debug_color, debug)) | |
| def __format(self, prefix, color, message): | |
| return color + prefix + f"\n{prefix}".join(filter(lambda x: x.strip() != "", message.split("\n"))) + Colors.RESET + "\n" | |
| def write_err(self, err, close=False): | |
| self.__err.write(self.__format("[RPN ERROR]> ", self.__err_color, err)) | |
| if close: | |
| exit(-1) | |
| def write_out(self, text): | |
| self.__err.write(self.__format("[RPN]> ", self.__out_color, text)) | |
| def show_result(self, result): | |
| self.write_out(f"Result: {result}") | |
| def show_help(self, opts): | |
| def _format_list(vals): | |
| vals = list(vals) | |
| return " * " + "\n * ".join(vals) if len(vals) > 0 else " [None]" | |
| def _get_aliases(v): | |
| aliases = [name for name, target in self.__aliases.items() if target == v] | |
| return f" [Aliases: {', '.join(aliases)}]" if len(aliases) > 0 else "" | |
| if opts == "" or opts == None: | |
| s = ( | |
| "John's Reverse Polish Notation Engine: \n" | |
| " * The engine is case insensitive\n" | |
| " * You can have one command per line, or one or more operators/constants, space or comma seperated" | |
| " * Type help -c to view all commands\n" | |
| " * Type help -o to view all operators, or help -od to view only default operators\n" | |
| " * Type help -a to view all aliases, or help -ad to view only default aliases\n" | |
| " * Type help <command> to view help for a specific command" | |
| " * Add a semicolon anywhere to start a comment\n" | |
| ) | |
| elif opts == "-c": | |
| s = "All engine commands: \n" + _format_list(map(lambda x: x[1], self.__commands.values())) | |
| elif opts == "-o" or opts == "-od": | |
| s = "All default engine operators: \n" + _format_list((x + _get_aliases(x) for x in self.__DefaultOperators.keys())) | |
| if opts == "-o": | |
| s += "\nAll user-defined engine operators: \n" + _format_list((x + _get_aliases(x) for x in (self.__operators.keys() - self.__DefaultOperators.keys()))) | |
| elif opts == "-a" or opts == "-ad": | |
| s = "All default engine aliases: \n" + _format_list((f"{x} -> {self.__aliases[x]}" for x in self.__DefaultAliases.keys())) | |
| if opts == "-a": | |
| s += "\nAll user-defined engine aliases: \n" + _format_list((f"{x} -> {self.__aliases[x]}" for x in (self.__aliases.keys() - self.__DefaultAliases.keys()))) | |
| elif opts in self.__commands: | |
| s = f" * {self.__commands[opts][1]}\n" | |
| else: | |
| self.write_err(f"Unrecognised option '{opts}' for HELP command") | |
| return | |
| self.write_debug(s) | |
| def show_stack(self, maybe_range): | |
| maybe_range = "" if maybe_range == None else maybe_range | |
| s = maybe_range.split(" ") | |
| lo = 0 | |
| hi = self.__evaluator.stack_depth() | |
| if len(s) >= 1 and len(s[0]) > 0: | |
| lo = int(s[0]) | |
| if len(s) == 2: | |
| hi = int(s[1]) | |
| if lo < 0 or lo > hi: | |
| self.write_debug(f"Invalid stack lo point ({lo})") | |
| if hi < 0 or hi > self.__evaluator.stack_depth(): | |
| self.write_debug(f"Invalid stack hi point ({hi})") | |
| self.write_debug("START OF STACK") | |
| for i in range(lo, hi): | |
| self.write_debug(f"{i}: {self.__evaluator.get_at(i)}") | |
| self.write_debug("END OF STACK") | |
| def get_input(self): | |
| for inp in self.__in: | |
| if self.__write_in_to_out: | |
| self.__out.write(inp + "\n") | |
| yield inp | |
| def run(self): | |
| def _next_line(): | |
| if self.__cursor: | |
| self.__out.write("> ") | |
| self.__out.flush() | |
| _next_line() | |
| for line in map(lambda x: x[:-1] if len(x) > 0 and x[-1] == "\n" else x, self.get_input()): | |
| line = line.split(";")[0].strip() | |
| try: | |
| if " " in line: | |
| cmd, args = splitone(line, " ") | |
| else: | |
| cmd, args = line, None | |
| if cmd == "": | |
| _next_line() | |
| continue | |
| while cmd in self.__aliases: | |
| cmd = self.__aliases[cmd] | |
| if cmd.lower() == "quit": | |
| self.show_result(self.__evaluator.top()) | |
| return | |
| if cmd in self.__commands: | |
| self.__commands[cmd][0](self, args) | |
| _next_line() | |
| continue | |
| for subcmd in splitmany(line, " ", ","): | |
| if subcmd in self.__operators: | |
| top = self.__evaluator.top() | |
| self.__evaluator.operator(self.__operators[subcmd]) | |
| if self.__evaluator.stack_depth() == 0: | |
| self.show_result(top) | |
| else: | |
| try: | |
| self.__evaluator.push(float(subcmd)) | |
| except ValueError: | |
| self.write_err(f"'{subcmd}' was not recognised as an alias, command, operator, or valid constant") | |
| _next_line() | |
| except (RpnStackEmptyError, RpnStackInvalidArgumentError) as e: | |
| self.write_err(e.message) | |
| _next_line() | |
| cmds = """RPN_OP square a: a,a,* | |
| PY_OP sqrt a: math.sqrt(a) | |
| 5 | |
| 5 | |
| square | |
| sqrt | |
| """ | |
| example_program = """ | |
| RPN_OP discrim a b c: b 2 ^ a c * 4 * - | |
| RPN_OP numroots a b c: a b c discrim 0 2 1 0 <=> | |
| ; x^2 | |
| 1 ; a | |
| 0 ; b | |
| 0 ; c | |
| numroots | |
| pop ; expected: 1 | |
| ; x^2 - x + 9 | |
| 1 ; a | |
| -1 ; b | |
| 9 ; c | |
| numroots | |
| pop ; expected: 0 | |
| ; x^2 + 10x + 5 | |
| 1 ; a | |
| 10 ; b | |
| 5 ; c | |
| numroots | |
| pop ; expected: 2 | |
| """ | |
| from sys import stdin | |
| os.system("") # required for colors to work on windows | |
| source, redir = stdin, False | |
| #source, redir = example_program.split("\n"), True | |
| computer : RpnComputer = RpnComputer(_in=source, write_in_to_out=redir, cursor=True) | |
| computer.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment