Created
February 27, 2021 22:40
Revisions
-
noxasaxon created this gist
Feb 27, 2021 .There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,281 @@ import timeit # root_object = { # "artifacts": { # "event": { # "details": { # "firstname": "Sterling", # "middlename": "Malory", # "lastname": "Archer", # "vault_test" : "vault:socless_vault_tests.txt" # } # } # } # } # vault_pattern = re.compile(r"(vault:)(\S+(?=\s|$))(.*)") import_module = """ import re VAULT_TOKEN = "vault:" PATH_TOKEN = "$." CONVERSION_TOKEN = "!" from typing import Any import json from jinja2.nativetypes import NativeEnvironment from jinja2 import select_autoescape, StrictUndefined def fetch_from_vault(string_thing, content_only=True): return string_thing # Jinja Environment Configuration #! this fails to escape <script>, escaping works with Environment jinja_env = NativeEnvironment( autoescape=select_autoescape( ["html", "xml"], default_for_string=True, default=True ), variable_start_string="{", # this defines the start tokens for a jinja template variable_end_string="}", # this is the end token for a jinja template undefined=StrictUndefined, # This configures Jinjas behaviour when a template user provides an undefined reference ### StrictUndefined here ensures that if the user references something that # Doesn't actually exist in the context, an error is raised # Without StrictUndefined, invalid references fail silently # More on undefined types here https://jinja.palletsprojects.com/en/2.11.x/api/#undefined-types ) # Define Custom Filters def maptostr(target_list): return [str(each) for each in target_list] def vault(vault_id: str): # A custom jinja Function which returns the content of a socless vault # we expect it to be called as {vault( context.vault_id) } and return the same value # that current vault:vault-id would return return fetch_from_vault(vault_id, content_only=True) def fromjson(string_json: str) -> Any: # This is a custom jinja Filter which expects stringified json and returns # the output of calling json.loads on it. return json.loads(string_json) # Add Custom Functions custom_functions = {"vault": vault, "fromjson": fromjson} # Add Custom Filters custom_filters = {"maptostr": maptostr, **custom_functions} # Register Custom Filters jinja_env.filters.update(custom_filters) # Register Custom Functions jinja_env.globals.update(custom_functions) def render_jinja_from_string(template_string: str, root_object: dict) -> Any: template_obj = jinja_env.from_string(template_string) return template_obj.render(context=root_object) ############################################################################ class OLDParameterResolver: def __init__(self, root_obj): self.root_obj = root_obj def resolve_jsonpath(self, path): _pre, _sep, post = path.partition(PATH_TOKEN) keys = post.split(".") obj_copy = self.root_obj.copy() for key in keys: try: value = obj_copy.get(key) except AttributeError: raise SoclessException( f"Unable to resolve key {key}, parent object does not exist. Full path: {path}" ) if isinstance(value, str) and value.startswith(VAULT_TOKEN): actual = self.resolve_vault_path(value) else: actual = value obj_copy = actual return obj_copy def resolve_vault_path(self, path): _, __, file_id = path.partition(VAULT_TOKEN) data = fetch_from_vault(file_id, content_only=True) return data def resolve_reference(self, reference_path): if not isinstance(reference_path, str): if isinstance(reference_path, dict): resolved_dict = {} for key, value in list(reference_path.items()): resolved_dict[key] = self.resolve_reference(value) return resolved_dict elif isinstance(reference_path, list): resolved_list = [] for item in reference_path: resolved_list.append(self.resolve_reference(item)) return resolved_list else: return reference_path if reference_path.startswith(PATH_TOKEN): reference, _, conversion = reference_path.partition(CONVERSION_TOKEN) resolved = self.resolve_jsonpath(reference) elif reference_path.startswith(VAULT_TOKEN): reference, _, conversion = reference_path.partition(CONVERSION_TOKEN) resolved = self.resolve_vault_path(reference) else: return reference_path if conversion: resolved = self.apply_conversion_from(resolved, conversion) return resolved def resolve_parameters(self, parameters): actual_params = {} for parameter, reference in list(parameters.items()): actual_params[parameter] = self.resolve_reference(reference) return actual_params def apply_conversion_from(self, data, conversion): print(data) print(conversion) if conversion == "json": return json.loads(data) class SoclessException(Exception): pass class ParameterResolver: def __init__(self, root_obj): self.root_obj = root_obj def resolve_reference(self, reference_path): if isinstance(reference_path, str): return resolve_string_parameter(reference_path, self.root_obj) elif isinstance(reference_path, dict): resolved_dict = {} for key, value in list(reference_path.items()): resolved_dict[key] = self.resolve_reference(value) return resolved_dict elif isinstance(reference_path, list): resolved_list = [] for item in reference_path: resolved_list.append(self.resolve_reference(item)) return resolved_list else: return reference_path def resolve_parameters(self, parameters): actual_params = {} for parameter, reference in parameters.items(): actual_params[parameter] = self.resolve_reference(reference) return actual_params def convert_deprecated_vault_to_template(vault_reference) -> str: reference, _, conversion = vault_reference.partition(CONVERSION_TOKEN) _, _, file_id = reference.partition(VAULT_TOKEN) template = f"vault('{file_id}')" if conversion: template = template + " |fromjson" return "{" + template + "}" def convert_legacy_reference_to_template(reference_path: str) -> str: try: template = reference_path if template.startswith(PATH_TOKEN): _, _, conversion = template.partition(CONVERSION_TOKEN) template = f"context{template[1:]}" if conversion: template = template + " |fromjson" template = "{" + template + "}" elif template.startswith(VAULT_TOKEN): template = convert_deprecated_vault_to_template(template) return template except (TypeError, KeyError) as e: raise SoclessException( f"Unable to convert reference type {type(reference_path)} to template - {e}" ) def resolve_string_parameter(parameter: str, root_object: dict) -> Any: template = convert_legacy_reference_to_template(parameter) resolved = render_jinja_from_string(template, root_object) if isinstance(resolved, str): # if jsonpath renders into something with a vault_token, it needs to run through jinja again if resolved.startswith(VAULT_TOKEN): new_template_string = convert_deprecated_vault_to_template(resolved) return render_jinja_from_string(new_template_string, root_object) ## autoescaping is currently disabled, this line may not be necessary # resolved = resolved.replace(""", '"').replace("'", "'") return resolved ###################_BEGIN_SETUP_################## root_obj = { "artifacts": { "event": { "details": { "firstname": "Sterling", "middlename": "Malory", "lastname": "Archer", "vault_test" : "vault:socless_vault_tests.txt" } } } } parameters = { "firstname": "$.artifacts.event.details.firstname", "lastname": "$.artifacts.event.details.lastname", "middlename": "Malory", "vault.txt": "vault:socless_vault_tests.txt", # "vault.json": "vault:socless_vault_tests.json!json", "acquaintances": [ { "firstname": "$.artifacts.event.details.middlename", "lastname": "$.artifacts.event.details.lastname" } ] } """ testcode = """ resolver = OLDParameterResolver(root_obj) output = resolver.resolve_parameters(parameters) """ print(min(timeit.repeat(stmt=testcode, setup=import_module, repeat=25, number=1))) ## resolver = OLDParameterResolver(root_obj) # repeat=25, number=1 # 0.000019248000000082754 # ----- # repeat=2, number=10000 # 0.12176976900000003 # ----- # repeat=5, number=100 # .0014764200000000338 # ----- # repeat=5, number=1000 # 0.012429356000000003 # ___________________ ## resolver = ParameterResolver(root_obj) # repeat=25, number=1 # 0.0031444959999999966 # ----- # repeat=5, number=100 # 0.30228340200000003 # ----- # repeat=5, number=1000 # 3.0495730029999995