Skip to content

Instantly share code, notes, and snippets.

@noxasaxon
Created February 27, 2021 22:40

Revisions

  1. noxasaxon created this gist Feb 27, 2021.
    281 changes: 281 additions & 0 deletions testing.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,281 @@
    import timeit

    # root_object = {
    # "artifacts": {
    # "event": {
    # "details": {
    # "firstname": "Sterling",
    # "middlename": "Malory",
    # "lastname": "Archer",
    # "vault_test" : "vault:socless_vault_tests.txt"
    # }
    # }
    # }
    # }
    # vault_pattern = re.compile(r"(vault:)(\S+(?=\s|$))(.*)")
    import_module = """
    import re
    VAULT_TOKEN = "vault:"
    PATH_TOKEN = "$."
    CONVERSION_TOKEN = "!"
    from typing import Any
    import json
    from jinja2.nativetypes import NativeEnvironment
    from jinja2 import select_autoescape, StrictUndefined
    def fetch_from_vault(string_thing, content_only=True):
    return string_thing
    # Jinja Environment Configuration
    #! this fails to escape <script>, escaping works with Environment
    jinja_env = NativeEnvironment(
    autoescape=select_autoescape(
    ["html", "xml"], default_for_string=True, default=True
    ),
    variable_start_string="{", # this defines the start tokens for a jinja template
    variable_end_string="}", # this is the end token for a jinja template
    undefined=StrictUndefined, # This configures Jinjas behaviour when a template user provides an undefined reference
    ### StrictUndefined here ensures that if the user references something that
    # Doesn't actually exist in the context, an error is raised
    # Without StrictUndefined, invalid references fail silently
    # More on undefined types here https://jinja.palletsprojects.com/en/2.11.x/api/#undefined-types
    )
    # Define Custom Filters
    def maptostr(target_list):
    return [str(each) for each in target_list]
    def vault(vault_id: str):
    # A custom jinja Function which returns the content of a socless vault
    # we expect it to be called as {vault( context.vault_id) } and return the same value
    # that current vault:vault-id would return
    return fetch_from_vault(vault_id, content_only=True)
    def fromjson(string_json: str) -> Any:
    # This is a custom jinja Filter which expects stringified json and returns
    # the output of calling json.loads on it.
    return json.loads(string_json)
    # Add Custom Functions
    custom_functions = {"vault": vault, "fromjson": fromjson}
    # Add Custom Filters
    custom_filters = {"maptostr": maptostr, **custom_functions}
    # Register Custom Filters
    jinja_env.filters.update(custom_filters)
    # Register Custom Functions
    jinja_env.globals.update(custom_functions)
    def render_jinja_from_string(template_string: str, root_object: dict) -> Any:
    template_obj = jinja_env.from_string(template_string)
    return template_obj.render(context=root_object)
    ############################################################################
    class OLDParameterResolver:
    def __init__(self, root_obj):
    self.root_obj = root_obj
    def resolve_jsonpath(self, path):
    _pre, _sep, post = path.partition(PATH_TOKEN)
    keys = post.split(".")
    obj_copy = self.root_obj.copy()
    for key in keys:
    try:
    value = obj_copy.get(key)
    except AttributeError:
    raise SoclessException(
    f"Unable to resolve key {key}, parent object does not exist. Full path: {path}"
    )
    if isinstance(value, str) and value.startswith(VAULT_TOKEN):
    actual = self.resolve_vault_path(value)
    else:
    actual = value
    obj_copy = actual
    return obj_copy
    def resolve_vault_path(self, path):
    _, __, file_id = path.partition(VAULT_TOKEN)
    data = fetch_from_vault(file_id, content_only=True)
    return data
    def resolve_reference(self, reference_path):
    if not isinstance(reference_path, str):
    if isinstance(reference_path, dict):
    resolved_dict = {}
    for key, value in list(reference_path.items()):
    resolved_dict[key] = self.resolve_reference(value)
    return resolved_dict
    elif isinstance(reference_path, list):
    resolved_list = []
    for item in reference_path:
    resolved_list.append(self.resolve_reference(item))
    return resolved_list
    else:
    return reference_path
    if reference_path.startswith(PATH_TOKEN):
    reference, _, conversion = reference_path.partition(CONVERSION_TOKEN)
    resolved = self.resolve_jsonpath(reference)
    elif reference_path.startswith(VAULT_TOKEN):
    reference, _, conversion = reference_path.partition(CONVERSION_TOKEN)
    resolved = self.resolve_vault_path(reference)
    else:
    return reference_path
    if conversion:
    resolved = self.apply_conversion_from(resolved, conversion)
    return resolved
    def resolve_parameters(self, parameters):
    actual_params = {}
    for parameter, reference in list(parameters.items()):
    actual_params[parameter] = self.resolve_reference(reference)
    return actual_params
    def apply_conversion_from(self, data, conversion):
    print(data)
    print(conversion)
    if conversion == "json":
    return json.loads(data)
    class SoclessException(Exception):
    pass
    class ParameterResolver:
    def __init__(self, root_obj):
    self.root_obj = root_obj
    def resolve_reference(self, reference_path):
    if isinstance(reference_path, str):
    return resolve_string_parameter(reference_path, self.root_obj)
    elif isinstance(reference_path, dict):
    resolved_dict = {}
    for key, value in list(reference_path.items()):
    resolved_dict[key] = self.resolve_reference(value)
    return resolved_dict
    elif isinstance(reference_path, list):
    resolved_list = []
    for item in reference_path:
    resolved_list.append(self.resolve_reference(item))
    return resolved_list
    else:
    return reference_path
    def resolve_parameters(self, parameters):
    actual_params = {}
    for parameter, reference in parameters.items():
    actual_params[parameter] = self.resolve_reference(reference)
    return actual_params
    def convert_deprecated_vault_to_template(vault_reference) -> str:
    reference, _, conversion = vault_reference.partition(CONVERSION_TOKEN)
    _, _, file_id = reference.partition(VAULT_TOKEN)
    template = f"vault('{file_id}')"
    if conversion:
    template = template + " |fromjson"
    return "{" + template + "}"
    def convert_legacy_reference_to_template(reference_path: str) -> str:
    try:
    template = reference_path
    if template.startswith(PATH_TOKEN):
    _, _, conversion = template.partition(CONVERSION_TOKEN)
    template = f"context{template[1:]}"
    if conversion:
    template = template + " |fromjson"
    template = "{" + template + "}"
    elif template.startswith(VAULT_TOKEN):
    template = convert_deprecated_vault_to_template(template)
    return template
    except (TypeError, KeyError) as e:
    raise SoclessException(
    f"Unable to convert reference type {type(reference_path)} to template - {e}"
    )
    def resolve_string_parameter(parameter: str, root_object: dict) -> Any:
    template = convert_legacy_reference_to_template(parameter)
    resolved = render_jinja_from_string(template, root_object)
    if isinstance(resolved, str):
    # if jsonpath renders into something with a vault_token, it needs to run through jinja again
    if resolved.startswith(VAULT_TOKEN):
    new_template_string = convert_deprecated_vault_to_template(resolved)
    return render_jinja_from_string(new_template_string, root_object)
    ## autoescaping is currently disabled, this line may not be necessary
    # resolved = resolved.replace("&#34;", '"').replace("&#39;", "'")
    return resolved
    ###################_BEGIN_SETUP_##################
    root_obj = {
    "artifacts": {
    "event": {
    "details": {
    "firstname": "Sterling",
    "middlename": "Malory",
    "lastname": "Archer",
    "vault_test" : "vault:socless_vault_tests.txt"
    }
    }
    }
    }
    parameters = {
    "firstname": "$.artifacts.event.details.firstname",
    "lastname": "$.artifacts.event.details.lastname",
    "middlename": "Malory",
    "vault.txt": "vault:socless_vault_tests.txt",
    # "vault.json": "vault:socless_vault_tests.json!json",
    "acquaintances": [
    {
    "firstname": "$.artifacts.event.details.middlename",
    "lastname": "$.artifacts.event.details.lastname"
    }
    ]
    }
    """
    testcode = """
    resolver = OLDParameterResolver(root_obj)
    output = resolver.resolve_parameters(parameters)
    """


    print(min(timeit.repeat(stmt=testcode, setup=import_module, repeat=25, number=1)))


    ## resolver = OLDParameterResolver(root_obj)
    # repeat=25, number=1
    # 0.000019248000000082754
    # -----
    # repeat=2, number=10000
    # 0.12176976900000003
    # -----
    # repeat=5, number=100
    # .0014764200000000338
    # -----
    # repeat=5, number=1000
    # 0.012429356000000003
    # ___________________
    ## resolver = ParameterResolver(root_obj)
    # repeat=25, number=1
    # 0.0031444959999999966
    # -----
    # repeat=5, number=100
    # 0.30228340200000003
    # -----
    # repeat=5, number=1000
    # 3.0495730029999995