Created
December 31, 2023 08:00
-
-
Save schlarpc/5d66b1d9aa4abd30dca974de976116be to your computer and use it in GitHub Desktop.
work-in-progress encoding of AWS Step Functions states language as python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ast | |
import inspect | |
import hashlib | |
import json | |
import base64 | |
import random | |
import re | |
import itertools | |
import time | |
import uuid | |
import contextlib | |
import datetime | |
from typing import TypeVar, TypeAlias, Iterable, Optional, Union, Mapping, Sequence, overload | |
JSON: TypeAlias = Union["JSONObject", "JSONArray", "JSONNumber", str, bool, None] | |
JSONNumber: TypeAlias = int | float | |
JSONObject: TypeAlias = dict[str, JSON] | |
JSONArray: TypeAlias = list[JSON] | |
class StatesFlow: | |
@overload | |
def wait(self, seconds: JSONNumber, /): | |
... | |
@overload | |
def wait(self, timestamp: datetime.datetime, /): | |
... | |
def wait(self, value: Union[JSONNumber, datetime.datetime], /): | |
# TODO support injecting time | |
if isinstance(value, (int, float)): | |
delta = value | |
elif isinstance(value, datetime.datetime): | |
delta = (value - datetime.datetime.utcnow()).total_seconds() | |
time.sleep(int(delta)) | |
def succeed(self): | |
raise NotImplementedError() | |
def fail(self, *, cause: Optional[str] = None, error: Optional[str] = None): | |
raise NotImplementedError() | |
def map(self): | |
raise NotImplementedError() | |
@contextlib.contextmanager | |
def parallel(self): | |
raise NotImplementedError() | |
class StatesIntrinsics: | |
def array(self, *args: JSON) -> list[JSON]: | |
return [*args] | |
def array_partition(self, array: list[JSON], chunk_size: JSONNumber, /) -> list[list[JSON]]: | |
if chunk_size < 1: | |
raise ValueError() | |
return [ | |
array[start_index:start_index+int(chunk_size)] | |
for start_index in range(0, len(array), int(chunk_size)) | |
] | |
def array_contains(self, array: list[JSON], value: JSON) -> bool: | |
return value in array | |
def array_range(self, first: JSONNumber, last: JSONNumber, increment: JSONNumber, /) -> list[int]: | |
return list(range(int(first), int(last + 1), int(increment))) | |
def array_get_item(self, array: list[JSON], index: JSONNumber, /) -> JSON: | |
return array[int(index)] | |
def array_length(self, array: list[JSON]) -> int: | |
return len(array) | |
def _array_unique_generator(self, array: list[JSON]) -> Iterable[JSON]: | |
seen: set[str] = set() | |
for value in array: | |
hashable = json.dumps(value, sort_keys=True) | |
if hashable not in seen: | |
yield value | |
def array_unique(self, array: list[JSON], /) -> list[JSON]: | |
# does not return in same order as step functions | |
return list(self._array_unique_generator(array)) | |
def base64_encode(self, value: str, /) -> str: | |
return base64.b64encode(value.encode("utf-8")).decode('ascii') | |
def base64_decode(self, value: str, /) -> str: | |
normalized = value.replace('+', '-').replace('/', '_') | |
cleaned = re.sub(r'[^A-Za-z0-9_-]', '', normalized) | |
return base64.urlsafe_b64decode(cleaned).decode('utf-8', errors='replace') | |
def hash(self, data: str, algorithm: str, /) -> str: | |
algorithms = { | |
"MD5": hashlib.md5, | |
"SHA-1": hashlib.sha1, | |
"SHA-256": hashlib.sha256, | |
"SHA-384": hashlib.sha384, | |
"SHA-512": hashlib.sha512, | |
} | |
if algorithm not in algorithms: | |
raise ValueError() | |
return algorithms[algorithm](data.encode("utf-8")).hexdigest() | |
def json_merge(self, object1: JSONObject, object2: JSONObject, deep_merge: JSON = False, /) -> JSONObject: | |
# deep_merge has no type validation, and only true raises as unimplemented | |
if deep_merge is True: | |
raise ValueError() | |
return {**object1, **object2} | |
def string_to_json(self, value: str) -> JSON: | |
# returns null if input string is just whitespace | |
if not value.strip(): | |
return None | |
return json.loads(value) | |
def math_random(self, start: JSONNumber, end: JSONNumber, seed: Optional[JSONNumber]=None, /) -> int: | |
rng = random.Random() | |
# same seed does not return same results as step functions (different algorithm) | |
if seed is not None: | |
rng.seed(int(seed)) | |
return rng.randint(int(start), int(end)) | |
def math_add(self, value1: JSONNumber, value2: JSONNumber, /) -> int: | |
return int(value1) + int(value2) | |
def string_split(self, input_string: str, splitter: str, /) -> list[str]: | |
parts = re.split("|".join(re.escape(c) for c in splitter), input_string) | |
# empty strings are omitted from the result | |
return list(filter(None, parts)) | |
def uuid(self, /) -> str: | |
return str(uuid.uuid4()).lower() | |
def _format_arg_to_string(self, arg: JSON) -> str: | |
if isinstance(arg, (int, float, bool)) or arg is None: | |
return json.dumps(arg) | |
elif isinstance(arg, str): | |
return arg | |
elif isinstance(arg, list): | |
# yes, this means that dicts render differently when nested inside a list. it's weird. | |
return json.dumps(arg, separators=(",", ":")) | |
elif isinstance(arg, dict): | |
pairs = ( | |
self._format_arg_to_string(k) + '=' + self._format_arg_to_string(v) | |
for k, v in arg.items() | |
) | |
return '{' + ', '.join(pairs) + '}' | |
raise NotImplementedError() | |
def _format_template_compose(self, template: str, formatted_args: list[str]) -> str: | |
buffer = [] | |
escape_active = False | |
interp_active = False | |
# split template into "control characters" and "spans of non-control characters" | |
# the control characters are processed by the state machine loop, while spans pass through | |
parts = (match.group() for match in re.finditer(r'[^\\{}]+|.', template)) | |
for part in parts: | |
if escape_active: | |
buffer.append(part) | |
escape_active = False | |
elif part == "\\": | |
escape_active = True | |
elif part == "}": | |
if not interp_active: | |
raise ValueError("matching '{' not found for '}'.") | |
if not formatted_args: | |
raise ValueError("number of arguments do not match the occurrences of {}") | |
buffer.append(formatted_args[0]) | |
formatted_args = formatted_args[1:] | |
interp_active = False | |
elif interp_active: | |
raise ValueError("matching '}' not found for '{'") | |
elif part == "{": | |
interp_active = True | |
else: | |
buffer.append(part) | |
if interp_active: | |
raise ValueError("matching '}' not found for '{'") | |
if formatted_args: | |
raise ValueError("number of arguments do not match the occurrences of {}") | |
return "".join(buffer) | |
def format(self, template: str, /, *args: JSON) -> str: | |
return self._format_template_compose(template, [self._format_arg_to_string(arg) for arg in args]) | |
a = StatesIntrinsics().format("hello fat\\{{}\\}her{", 1) | |
print(json.dumps(a)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment