Last active
February 1, 2025 22:59
-
-
Save mik-laj/3ae253d5394b34447dcbd426fd19b389 to your computer and use it in GitHub Desktop.
Helm-unittest to Python unittest migration script. Part of: https://github.com/apache/airflow/pull/11827
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import codecs | |
import re | |
from os import listdir | |
from os.path import isfile, join, dirname, abspath, basename | |
from typing import Any, Optional, Set, Dict | |
import sys | |
import black | |
from jinja2 import Template | |
import jinja2 | |
import yaml | |
# UTILS | |
def snake_to_camel(word): | |
return ''.join(x.capitalize() or '_' for x in word.split('_')) | |
def camel_to_snake(name): | |
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name) | |
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower() | |
def escape_string_with_python_escapes(string_to_escape: Optional[str]) -> Optional[str]: | |
if not string_to_escape: | |
return None | |
escaped_bytes, _ = codecs.escape_encode(string_to_escape.encode()) # type: ignore # C-Api level | |
return "'" + escaped_bytes.decode("utf-8") + "'" # type: ignore | |
def topython(serializable_obj: Any) -> str: | |
""" | |
Serialize to Python code | |
""" | |
if isinstance(serializable_obj, jinja2.StrictUndefined): | |
# raise exception | |
str(serializable_obj) | |
if isinstance(serializable_obj, jinja2.Undefined): | |
return "None" | |
def serialize_recursively(target: Any, markers: Set[int]) -> str: | |
marker_id = id(target) | |
if marker_id in markers: | |
raise ValueError("Circular reference detected") | |
markers.add(marker_id) | |
if isinstance(target, str): | |
buf = f"{escape_string_with_python_escapes(target)}" | |
elif isinstance(target, int): | |
buf = f"{target}" | |
elif isinstance(target, dict): | |
buf = "{" | |
buf += ", ".join( | |
f"{serialize_recursively(key, markers)}: {serialize_recursively(value, markers)}" | |
for key, value in target.items() | |
) | |
buf += "}" | |
elif isinstance(target, list): | |
buf = "[" | |
buf += ", ".join(serialize_recursively(item, markers) for item in target) | |
buf += "]" | |
elif isinstance(target, set): | |
if target: | |
buf = "{" | |
buf += ", ".join(serialize_recursively(item, markers) for item in target) | |
buf += "}" | |
else: | |
buf = "set()" | |
elif isinstance(target, tuple): | |
buf = "(" | |
buf += ", ".join(serialize_recursively(item, markers) for item in target) | |
buf += ")" | |
elif target is True: | |
buf = "True" | |
elif target is False: | |
buf = "False" | |
elif target is None: | |
buf = "None" | |
else: | |
raise ValueError(f"Type '{type(target)}' is not serializable") | |
markers.remove(marker_id) | |
return buf | |
return serialize_recursively(serializable_obj, set()) | |
# MAAIN | |
PYTHON_TEMPLATE = """ | |
import unittest | |
{% if ensure_pod_template_file_is_loadable is defined and ensure_pod_template_file_is_loadable %} | |
from os import remove | |
from os.path import realpath, dirname | |
from shutil import copyfile | |
{% endif %} | |
import jmespath | |
from tests.helm_template_generator import render_chart | |
{% if ensure_pod_template_file_is_loadable is defined and ensure_pod_template_file_is_loadable %} | |
ROOT_FOLDER = realpath(dirname(realpath(__file__)) + "/..") | |
{% endif %} | |
class {{ class_name }}(unittest.TestCase): | |
{% if ensure_pod_template_file_is_loadable is defined and ensure_pod_template_file_is_loadable %} | |
def setUp(self): | |
copyfile( | |
ROOT_FOLDER + "/files/pod-template-file.kubernetes-helm-yaml", | |
ROOT_FOLDER + "/templates/pod-template-file.yaml" | |
) | |
def tearDown(self): | |
remove(ROOT_FOLDER + "/templates/pod-template-file.yaml") | |
{% endif %} | |
{% for test in tests %} | |
def {{ test.fn_name }}(self): | |
docs = render_chart( | |
values={{ test.set | default({}) | topython }}, | |
show_only={{ templates | topython }}, | |
) | |
{% for assert in test.asserts %} | |
{% set assert_type = (assert.keys() | first) %} | |
{% set assert_args = (assert.values() | first) %} | |
{% if false %} | |
# DEBUG: {{ assert_type | topython }} | |
# DEBUG: {{ assert_args | topython }} | |
{% endif %} | |
{% if assert_type == "equal" %} | |
self.assertEqual({{ assert_args.value | topython }}, jmespath.search({{ assert_args.path | topython }}, docs[0])) | |
{% elif assert_type == "notEqual" %} | |
self.assertNotEqual({{ assert_args.value | topython }}, jmespath.search({{ assert_args.path | topython }}, docs[0])) | |
{% elif assert_type == "isNotNull" %} | |
self.assertIsNotNone(jmespath.search({{ assert_args.path | topython }}, docs[0])) | |
{% elif assert_type == "matchRegex" %} | |
self.assertRegex(jmespath.search({{ assert_args.path | topython }}, docs[0]), {{ assert_args.pattern | topython }}) | |
{% elif assert_type == "contains" %} | |
self.assertIn({{ assert_args.content | topython }}, jmespath.search({{ assert_args.path | topython }}, docs[0])) | |
{% elif assert_type == "isKind" %} | |
self.assertRegex(docs[0]["kind"], {{ assert_args.of | topython }}) | |
{% elif assert_type == "hasDocuments" %} | |
self.assertEqual({{ assert_args.count | topython }}, len(docs)) | |
{% else %} | |
# Unknown type: {{ assert_type | topython }} | |
{% endif %} | |
{% endfor %} | |
{% endfor %} | |
""" | |
def create_template(): | |
template_env = jinja2.Environment(undefined=jinja2.StrictUndefined, trim_blocks=True, lstrip_blocks=True) | |
template_env.filters["topython"] = topython | |
template = template_env.from_string(PYTHON_TEMPLATE) | |
return template | |
def convert_file(template: Template, filepath: str, content: Dict) -> str: | |
filename = basename(filepath) | |
content["class_name"] = snake_to_camel(filename.replace("-", "_")[: -(len(".yaml"))]) | |
if "pod-template-file.yaml" in content["templates"]: | |
content["ensure_pod_template_file_is_loadable"] = True | |
content["templates"] = [ | |
join("templates", t).replace(".kubernetes-helm-yaml", ".yaml") for t in content["templates"] | |
] | |
for test in content["tests"]: | |
test["fn_name"] = "test_" + camel_to_snake(test["it"].replace(" ", "_")).replace("__", "_").replace( | |
"&", "and" | |
).replace(",", "") | |
result_content = template.render(**content) | |
result_content = black.format_file_contents( | |
result_content, fast=False, mode=black.FileMode(line_length=110) | |
) | |
return result_content | |
CURRENT_DIRECTORY = "/opt/airflow/chart/tests" | |
TEMPLATE = create_template() | |
def convert_directory(template: Template, directory: str, write: bool): | |
onlyfiles = [ | |
join(directory, f) for f in listdir(directory) if isfile(join(directory, f)) and f.endswith(".yaml") | |
] | |
for filepath in onlyfiles: | |
print(f"Processing file: {filepath}", file=sys.stderr) | |
try: | |
with open(filepath) as file: | |
content = yaml.safe_load(file) | |
output_filename = "test_" + basename(filepath).replace("-", "_")[: -(len(".yaml"))] + ".py" | |
output_path = dirname(filepath) + "/" + output_filename | |
result = convert_file(template, filepath, content) | |
if write: | |
with open(output_path, "w") as output_file: | |
output_file.write(result) | |
output_file.close() | |
print(f"Content saved to: {output_path}", file=sys.stderr) | |
else: | |
print(f"# {output_path}") | |
print(result) | |
except Exception as ex: # pylint: disable=broad-except | |
print("Cann't pprocess file", ex) | |
convert_directory(TEMPLATE, CURRENT_DIRECTORY, True) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
import sys | |
from tempfile import NamedTemporaryFile | |
import yaml | |
def render_chart(name="RELEASE-NAME", values=None, show_only=None): | |
""" | |
Function that renders a helm chart into dictionaries. For helm chart testing only | |
""" | |
values = values or {} | |
with NamedTemporaryFile() as tmp_file: | |
content = yaml.dump(values) | |
tmp_file.write(content.encode()) | |
tmp_file.flush() | |
command = ["helm", "template", name, sys.path[0], '--values', tmp_file.name] | |
if show_only: | |
for i in show_only: | |
command.extend(["--show-only", i]) | |
templates = subprocess.check_output(command) | |
k8s_objects = yaml.load_all(templates) | |
k8s_objects = [k8s_object for k8s_object in k8s_objects if k8s_object] # type: ignore | |
return k8s_objects |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment