Created
October 7, 2025 14:30
-
-
Save tarruda/0c1d299d399db3050fadc21b8a74edf5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import hashlib | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| import time | |
| import yaml | |
| from pathlib import Path | |
| from typing import Any, Mapping | |
| from urllib.parse import urlparse | |
| import jinja2 | |
| # ---------------------------------------------------------------------- | |
| # Embedded Jinja2 Macros | |
| # ---------------------------------------------------------------------- | |
| DEFAULT_MACROS = r""" | |
| {% macro service(name, selector_app, ports, type='ClusterIP') -%} | |
| apiVersion: v1 | |
| kind: Service | |
| metadata: | |
| name: {{ name }} | |
| spec: | |
| selector: | |
| app: {{ selector_app }} | |
| type: {{ type }} | |
| ports: | |
| {% for port in ports %} | |
| - name: {{ port.name }} | |
| protocol: {{ port.protocol | default('TCP') }} | |
| port: {{ port.port }} | |
| targetPort: {{ port.targetPort }} | |
| {% if (type == 'NodePort' or type == 'LoadBalancer') and port.nodePort is defined %} | |
| nodePort: {{ port.nodePort }} | |
| {%- endif %} | |
| {%- endfor %} | |
| {%- endmacro %} | |
| {% macro deployment(name, containers, replicas=1, volumes=[], extra_labels={}, bind_mounts=[]) -%} | |
| {#- | |
| Kubernetes Deployment macro with hostPath PV/PVC support. | |
| -#} | |
| {% set generated_volumes = [] %} | |
| {% for mount in bind_mounts %} | |
| {% set volume_name = (name + '-' + mount.hostPath) | regex_replace('[^a-zA-Z0-9]+', '-') | lower | trim('-') %} | |
| apiVersion: v1 | |
| kind: PersistentVolume | |
| metadata: | |
| name: {{ volume_name }}-pv | |
| spec: | |
| capacity: | |
| storage: 1Ki | |
| accessModes: ['ReadWriteMany'] | |
| persistentVolumeReclaimPolicy: Retain | |
| storageClassName: "" | |
| hostPath: | |
| path: {{ mount.hostPath }} | |
| type: DirectoryOrCreate | |
| --- | |
| apiVersion: v1 | |
| kind: PersistentVolumeClaim | |
| metadata: | |
| name: {{ volume_name }}-pvc | |
| spec: | |
| storageClassName: "" | |
| accessModes: ['ReadWriteMany'] | |
| resources: | |
| requests: | |
| storage: 1Ki | |
| {% set _ = generated_volumes.append({ | |
| 'name': volume_name, | |
| 'persistentVolumeClaim': {'claimName': volume_name + '-pvc'} | |
| }) %} | |
| --- | |
| {% endfor %} | |
| apiVersion: apps/v1 | |
| kind: Deployment | |
| metadata: | |
| name: {{ name }} | |
| spec: | |
| replicas: {{ replicas }} | |
| selector: | |
| matchLabels: | |
| app: {{ name }} | |
| template: | |
| metadata: | |
| labels: | |
| app: {{ name }} | |
| {% for key, value in extra_labels.items() %}{{ key }}: {{ value }}{% endfor %} | |
| spec: | |
| containers: | |
| {% for container in containers %} | |
| - name: {{ container.name if container.name is defined else (name if loop.first) }} | |
| image: {{ container.image }} | |
| imagePullPolicy: {{ container.imagePullPolicy | default('Always') }} | |
| {% if container.command is defined %} | |
| command: {{ container.command | tojson }} | |
| {% endif %} | |
| {% if container.args is defined %} | |
| args: {{ container.args | tojson }} | |
| {% endif %} | |
| {% if container.ports is defined %} | |
| ports: | |
| {% for port in container.ports %} | |
| - containerPort: {{ port }} | |
| {% endfor %} | |
| {% endif %} | |
| {% set dynamic_mounts = [] %} | |
| {% for mount in bind_mounts %} | |
| {% set container_name = container.name if container.name is defined else (name if loop.first) %} | |
| {% if mount.containers is not defined or container_name in mount.containers %} | |
| {% set volume_name = (name + '-' + mount.hostPath) | regex_replace('[^a-zA-Z0-9]+', '-') | lower | trim('-') %} | |
| {% set _ = dynamic_mounts.append({ | |
| 'name': volume_name, | |
| 'mountPath': mount.containerPath | |
| }) %} | |
| {% endif %} | |
| {% endfor %} | |
| {% set container_volume_mounts = container.get('volumeMounts', []) %} | |
| {% if container_volume_mounts or dynamic_mounts %} | |
| volumeMounts: {{ (dynamic_mounts + container_volume_mounts) | tojson }} | |
| {% endif %} | |
| {% if container.env is defined %} | |
| env: {{ container.env | tojson }} | |
| {% endif %} | |
| {% if container.resources is defined %} | |
| resources: {{ container.resources | tojson }} | |
| {% endif %} | |
| {% endfor %} | |
| {% if volumes or generated_volumes %} | |
| volumes: {{ (generated_volumes + volumes) | tojson }} | |
| {%- endif %} | |
| {%- endmacro %} | |
| """ | |
| # ---------------------------------------------------------------------- | |
| # Kubernetes Manifests for In-Cluster Registry | |
| # ---------------------------------------------------------------------- | |
| REGISTRY_MANIFESTS = """ | |
| apiVersion: v1 | |
| kind: Namespace | |
| metadata: | |
| name: in-cluster-registry | |
| --- | |
| apiVersion: v1 | |
| kind: PersistentVolumeClaim | |
| metadata: | |
| name: in-cluster-registry-pvc | |
| namespace: in-cluster-registry | |
| spec: | |
| accessModes: | |
| - ReadWriteOnce | |
| storageClassName: local-path | |
| resources: | |
| requests: | |
| storage: 15Gi | |
| --- | |
| apiVersion: apps/v1 | |
| kind: Deployment | |
| metadata: | |
| name: in-cluster-registry | |
| namespace: in-cluster-registry | |
| labels: | |
| app: in-cluster-registry | |
| spec: | |
| replicas: 1 | |
| selector: | |
| matchLabels: | |
| app: in-cluster-registry | |
| template: | |
| metadata: | |
| labels: | |
| app: in-cluster-registry | |
| spec: | |
| volumes: | |
| - name: registry-storage | |
| persistentVolumeClaim: | |
| claimName: in-cluster-registry-pvc | |
| containers: | |
| - name: registry | |
| image: registry:2 | |
| ports: | |
| - containerPort: 5000 | |
| volumeMounts: | |
| - name: registry-storage | |
| mountPath: /var/lib/registry | |
| --- | |
| apiVersion: v1 | |
| kind: Service | |
| metadata: | |
| name: in-cluster-registry-service | |
| namespace: in-cluster-registry | |
| spec: | |
| selector: | |
| app: in-cluster-registry | |
| # type: ClusterIP is the default and is all that's needed now. | |
| type: ClusterIP | |
| ports: | |
| - name: registry-port | |
| protocol: TCP | |
| port: 5000 | |
| targetPort: 5000 | |
| --- | |
| apiVersion: apps/v1 | |
| kind: DaemonSet | |
| metadata: | |
| name: in-cluster-registry-proxy | |
| namespace: in-cluster-registry | |
| labels: | |
| app: in-cluster-registry-proxy | |
| spec: | |
| selector: | |
| matchLabels: | |
| app: in-cluster-registry-proxy | |
| template: | |
| metadata: | |
| labels: | |
| app: in-cluster-registry-proxy | |
| spec: | |
| # Use a high priority to ensure this critical networking pod is scheduled. | |
| priorityClassName: system-node-critical | |
| containers: | |
| - name: socat-proxy | |
| image: alpine/socat:1.7.4.4-r0 | |
| args: | |
| # Listen on port 5000 and forward to the internal service FQDN | |
| - TCP4-LISTEN:5000,fork,reuseaddr | |
| - TCP4:in-cluster-registry-service.in-cluster-registry.svc.cluster.local:5000 | |
| ports: | |
| - name: registry-proxy | |
| # This binds port 5000 on the host node's network interface. | |
| hostPort: 5000 | |
| containerPort: 5000 | |
| protocol: TCP | |
| """ | |
| # ---------------------------------------------------------------------- | |
| # Utilities | |
| # ---------------------------------------------------------------------- | |
| def run_command(cmd: list[str], text: bool = True, check: bool = True, capture_output: bool = True, | |
| cwd: Path | None = None, dry_run: bool = False, extra_env: dict | None = None) -> subprocess.CompletedProcess: | |
| """Helper to run external commands.""" | |
| if dry_run: | |
| print(f" DRY-RUN: Would run command: {' '.join(cmd)}") | |
| return subprocess.CompletedProcess(cmd, 0, "", "") | |
| env = None | |
| if extra_env: | |
| env = os.environ.copy() | |
| env.update(extra_env) | |
| return subprocess.run(cmd, text=text, check=check, capture_output=capture_output, cwd=cwd, env=env) | |
| def run_kubectl(args: list[str], context: str | None = None) -> tuple[int, str, str]: | |
| cmd = ["kubectl"] | |
| if context: | |
| cmd += ["--context", context] | |
| cmd += args | |
| proc = subprocess.run(cmd, capture_output=True, text=True) | |
| return proc.returncode, proc.stdout.strip(), proc.stderr.strip() | |
| # ---------------------------------------------------------------------- | |
| # Jinja2 / YAML | |
| # ---------------------------------------------------------------------- | |
| def create_jinja_env(config_vars: Mapping[str, Any] | None = None) -> jinja2.Environment: | |
| env = jinja2.Environment(trim_blocks=True, lstrip_blocks=True, undefined=jinja2.StrictUndefined) | |
| def regex_replace(s, find, replace): | |
| return re.sub(find, replace, s) | |
| env.filters['regex_replace'] = regex_replace | |
| macro_template = env.from_string(DEFAULT_MACROS) | |
| for name in dir(macro_template.module): | |
| if not name.startswith('_'): | |
| env.globals[name] = getattr(macro_template.module, name) | |
| if config_vars: | |
| env.globals.update(config_vars) | |
| return env | |
| def yaml_dump_all(docs: list[Mapping[str, Any]]) -> str: | |
| return "\n---\n".join(yaml.safe_dump(d, sort_keys=False) for d in docs) | |
| def load_manifests_from_dir(root: Path, jinja_env: jinja2.Environment) -> list[Mapping[str, Any]]: | |
| manifests: list[Mapping[str, Any]] = [] | |
| for file in sorted(root.rglob("*")): | |
| if file.name.startswith("."): | |
| continue | |
| if not any(file.name.endswith(ext) for ext in [".yaml", ".yml", ".yaml.j2", ".yml.j2"]): | |
| continue | |
| with file.open("r", encoding="utf-8") as fh: | |
| content = fh.read() | |
| if file.name.endswith((".yaml.j2", ".yml.j2")): | |
| template = jinja_env.from_string(content) | |
| content = template.render() | |
| for doc in yaml.safe_load_all(content): | |
| if doc: | |
| manifests.append(doc) | |
| return manifests | |
| # ---------------------------------------------------------------------- | |
| # Config File | |
| # ---------------------------------------------------------------------- | |
| def load_config_file(root: Path) -> dict[str, Any]: | |
| """Load optional .config.yml file from the root directory.""" | |
| config_path = root / ".config.yml" | |
| if not config_path.exists(): | |
| return {} | |
| with config_path.open("r", encoding="utf-8") as fh: | |
| try: | |
| data = yaml.safe_load(fh) or {} | |
| except yaml.YAMLError as e: | |
| raise RuntimeError(f"Failed to parse {config_path}: {e}") | |
| if not isinstance(data, dict): | |
| raise RuntimeError(f"{config_path} must contain a YAML mapping") | |
| return data | |
| # ---------------------------------------------------------------------- | |
| # Metadata / Hashing | |
| # ---------------------------------------------------------------------- | |
| def clean_metadata(meta: dict[str, Any]) -> dict[str, Any]: | |
| filtered = { | |
| k: v for k, v in meta.items() | |
| if k not in {"uid", "resourceVersion", "generation", "creationTimestamp", "managedFields"} | |
| } | |
| if "annotations" in filtered: | |
| filtered["annotations"] = { | |
| k: v for k, v in filtered["annotations"].items() | |
| if not k.endswith("/manifest-sha") | |
| } | |
| if "labels" in filtered: | |
| filtered["labels"] = { | |
| k: v for k, v in filtered["labels"].items() | |
| if not k.endswith("/managed-by") | |
| } | |
| return filtered | |
| def compute_object_sha(obj: Mapping[str, Any]) -> str: | |
| clean_obj = dict(obj) | |
| if "metadata" in clean_obj: | |
| clean_obj["metadata"] = clean_metadata(dict(clean_obj["metadata"])) | |
| canonical = json.dumps(clean_obj, separators=(",", ":"), sort_keys=True) | |
| return hashlib.sha256(canonical.encode()).hexdigest() | |
| def add_sync_metadata(obj: Mapping[str, Any], project_name: str) -> dict[str, Any]: | |
| obj = dict(obj) | |
| meta = obj.setdefault("metadata", {}) | |
| annotations = meta.setdefault("annotations", {}) | |
| labels = meta.setdefault("labels", {}) | |
| sha = compute_object_sha(obj) | |
| annotations[f"{project_name}/manifest-sha"] = sha | |
| labels[f"{project_name}/managed-by"] = project_name | |
| return obj | |
| def build_key(obj: Mapping[str, Any]) -> str: | |
| kind = obj.get("kind", "Unknown") | |
| meta = obj.get("metadata", {}) | |
| name = meta.get("name", "unknown") | |
| return f"{kind}/{name}" | |
| # ---------------------------------------------------------------------- | |
| # Resource Discovery | |
| # ---------------------------------------------------------------------- | |
| IGNORE_RESOURCES = re.compile( | |
| r''' | |
| event | | |
| componentstatus | | |
| node | | |
| endpoint | | |
| binding | | |
| replica | | |
| review | | |
| request | | |
| helm | | |
| ipaddress | | |
| networkpoli | | |
| lease | | |
| registration | |
| ''', re.IGNORECASE | re.VERBOSE) | |
| def discover_resource_types(context: str | None) -> list[str]: | |
| rc, out, err = run_kubectl(["api-resources", "-o", "name"], context) | |
| if rc != 0 or not out: | |
| raise RuntimeError(f"kubectl api-resources failed: {err}") | |
| names = [line.strip() for line in out.splitlines() if line.strip()] | |
| return [n for n in names if not IGNORE_RESOURCES.search(n)] | |
| def get_managed_resources(context: str | None, project_name: str) -> dict[str, dict[str, Any]]: | |
| all_resources = {} | |
| label_key = f"{project_name}/managed-by" | |
| label_value = project_name | |
| resource_types = discover_resource_types(context) | |
| args = ["get", ','.join(resource_types), "-l", f"{label_key}={label_value}", "-o", "json"] | |
| rc, out, err = run_kubectl(args, context) | |
| if rc != 0 or not out: | |
| if "No resources found" not in err: | |
| raise RuntimeError(f"Failed to get managed resources: {err}") | |
| return {} | |
| data = json.loads(out) | |
| for item in data.get("items", []): | |
| key = build_key(item) | |
| sha = item.get("metadata", {}).get("annotations", {}).get(f"{project_name}/manifest-sha") | |
| all_resources[key] = {"manifest": item, "sha": sha} | |
| return all_resources | |
| # ---------------------------------------------------------------------- | |
| # Image Building | |
| # ---------------------------------------------------------------------- | |
| def discover_image_dirs(root: Path) -> list[Path]: | |
| """Finds all valid .image directories to be built.""" | |
| image_dirs = [] | |
| for item in root.iterdir(): | |
| if not item.is_dir() or not item.name.endswith(".image") or item.name.startswith("."): | |
| continue | |
| if not any((item / f).exists() for f in ["Dockerfile", "Containerfile"]): | |
| print(f" ⚠️ Skipping {item.name}: No Dockerfile or Containerfile found.") | |
| continue | |
| image_dirs.append(item) | |
| return image_dirs | |
| def get_node_info(context: str | None) -> tuple[str, str]: | |
| """Gets the name and internal IP of the first running node.""" | |
| rc, out, err = run_kubectl([ | |
| "get", "nodes", | |
| "-o", "jsonpath={.items[?(@.status.conditions[-1].type=='Ready')].metadata.name},{.items[?(@.status.conditions[-1].type=='Ready')].status.addresses[?(@.type=='InternalIP')].address}" | |
| ], context) | |
| if rc != 0 or not out or "," not in out: | |
| raise RuntimeError(f"Failed to get k3s node name and IP: {err}") | |
| name_list, ip_list = out.split(',') | |
| name = name_list.split(' ')[0] | |
| ip = ip_list.split(' ')[0] | |
| return name, ip | |
| def get_kubectl_host(context: str | None) -> str: | |
| """Gets the server hostname from the current kubectl context.""" | |
| cmd = ["config", "view", "--minify", "-o", "jsonpath={.clusters[0].cluster.server}"] | |
| rc, out, err = run_kubectl(cmd, context) | |
| if rc != 0 or not out: | |
| raise RuntimeError(f"Could not determine kubectl server host: {err}") | |
| parsed_url = urlparse(out) | |
| if not parsed_url.hostname: | |
| raise RuntimeError(f"Could not parse hostname from kubectl server URL: {out}") | |
| return parsed_url.hostname | |
| def ensure_insecure_registry(context: str | None, dry_run: bool, kubectl_host: str): | |
| """Ensures the in-cluster registry is running and prints setup instructions.""" | |
| namespace = "in-cluster-registry" | |
| deployment_name = "in-cluster-registry" | |
| rc, _, _ = run_kubectl(["get", "deployment", deployment_name, "-n", namespace], context) | |
| if rc == 0: | |
| print(" ✅ In-cluster registry already exists.") | |
| else: | |
| print(" 🏗️ Creating in-cluster registry...") | |
| manifests = [doc for doc in yaml.safe_load_all(REGISTRY_MANIFESTS) if doc] | |
| kubectl_apply(manifests, context, dry_run, quiet=True) | |
| if not dry_run: | |
| print(" ⏳ Waiting for registry to become available...") | |
| try: | |
| wait_cmd = ["wait", "--for=condition=available", f"deployment/{deployment_name}", "-n", namespace, "--timeout=120s"] | |
| run_kubectl(wait_cmd, context) | |
| except Exception as e: | |
| raise RuntimeError(f"In-cluster registry deployment failed to start: {e}") | |
| time.sleep(5) | |
| # --- Print one-time setup instructions --- | |
| _, node_ip = get_node_info(context) | |
| push_addr = f"{kubectl_host}:5000" | |
| internal_addr = f"in-cluster-registry-service.{namespace}.svc.cluster.local:5000" | |
| print("\n" + "="*80) | |
| print("🎉 ACTION REQUIRED: ONE-TIME SETUP FOR INSECURE REGISTRY 🎉") | |
| print("="*80) | |
| print("\nYou must configure your system and your k3s cluster ONCE for this to work.") | |
| print("\n1. CONFIGURE YOUR LOCAL MACHINE (to enable `podman push`):") | |
| print("-" * 55) | |
| print(f" Add the following to '/etc/containers/registries.conf':") | |
| print("\n [[registry]]") | |
| print(f' location = "{push_addr}"') | |
| print(" insecure = true\n") | |
| print("\n2. CONFIGURE YOUR K3S CLUSTER (to enable image pulling):") | |
| print("-" * 55) | |
| print(f" SSH into your k3s node(s) and create/edit '/etc/rancher/k3s/registries.yaml':") | |
| print("\n mirrors:") | |
| print(f' "{internal_addr}":') | |
| print(f' endpoint:') | |
| print(f' - "http://{internal_addr}"\n') | |
| print(" After saving the file, restart k3s: 'sudo systemctl restart k3s'") | |
| print("\n" + "="*80 + "\n") | |
| def handle_local_images(root: Path, context: str | None, dry_run: bool, registry_addr: str) -> dict: | |
| """Orchestrates discovery and building of local images.""" | |
| image_dirs = discover_image_dirs(root) | |
| if not image_dirs: | |
| return {} | |
| if not registry_addr: | |
| raise RuntimeError("Local images were found, but a registry address was not configured.") | |
| print(f"\n🔨 Building and pushing local images to registry: {registry_addr}") | |
| built_images = {} | |
| for img_dir in image_dirs: | |
| image_name = img_dir.name.replace(".image", "") | |
| simple_tag = f"{image_name}:latest" | |
| full_push_tag = f"{registry_addr}/{simple_tag}" | |
| print(f" building: {image_name} -> {full_push_tag}") | |
| try: | |
| build_cmd = ["podman", "build", "-t", simple_tag, "."] | |
| run_command(build_cmd, cwd=img_dir, dry_run=dry_run) | |
| tag_cmd = ["podman", "tag", simple_tag, full_push_tag] | |
| run_command(tag_cmd, cwd=img_dir, dry_run=dry_run) | |
| push_cmd = ["podman", "push", "--tls-verify=false", full_push_tag] | |
| run_command(push_cmd, cwd=img_dir, dry_run=dry_run) | |
| built_images[image_name] = full_push_tag | |
| except FileNotFoundError: | |
| raise RuntimeError("`podman` command not found. Is it installed and in your PATH?") | |
| except subprocess.CalledProcessError as e: | |
| raise RuntimeError(f"Failed to build or push {image_name}:\n{e.stderr}") | |
| print(" ✅ All local images pushed successfully.") | |
| return {"images": built_images} | |
| # ---------------------------------------------------------------------- | |
| # Diff / Apply / Delete | |
| # ---------------------------------------------------------------------- | |
| def diff_states(old: dict[str, dict[str, Any]], new: list[Mapping[str, Any]], project_name: str): | |
| new_state = {} | |
| for manifest in new: | |
| annotated = add_sync_metadata(manifest, project_name) | |
| key = build_key(annotated) | |
| new_state[key] = { | |
| "manifest": annotated, | |
| "sha": annotated["metadata"]["annotations"][f"{project_name}/manifest-sha"] | |
| } | |
| to_delete, to_apply = [], [] | |
| for key, res in old.items(): | |
| if key not in new_state: | |
| to_delete.append(res["manifest"]) | |
| for key, res in new_state.items(): | |
| if key not in old or res["sha"] != old[key]["sha"]: | |
| to_apply.append(res["manifest"]) | |
| return to_delete, to_apply | |
| def kubectl_apply(manifests: list[Mapping[str, Any]], context: str | None, dry_run: bool, quiet: bool=False): | |
| if not manifests: | |
| return | |
| if not quiet: | |
| for obj in manifests: | |
| print(f" ➕ APPLY {build_key(obj)}{' (dry run)' if dry_run else ''}") | |
| yaml_data = yaml_dump_all(manifests) | |
| cmd = ["kubectl"] | |
| if context: | |
| cmd += ["--context", context] | |
| cmd += ["apply", "-f", "-"] | |
| if dry_run: | |
| cmd.append("--dry-run=client") | |
| proc = subprocess.run(cmd, input=yaml_data, text=True, check=False, capture_output=True) | |
| if proc.returncode != 0: | |
| raise RuntimeError(f"kubectl apply failed:\n{proc.stderr}") | |
| def kubectl_delete(manifests: list[Mapping[str, Any]], context: str | None, dry_run: bool): | |
| if not manifests: | |
| return | |
| for obj in manifests: | |
| print(f" ❌ DELETE {build_key(obj)}{' (dry run)' if dry_run else ''}") | |
| yaml_data = yaml_dump_all(manifests) | |
| cmd = ["kubectl"] | |
| if context: | |
| cmd += ["--context", context] | |
| cmd += ["delete", "-f", "-", "--ignore-not-found=true"] | |
| if dry_run: | |
| cmd.append("--dry-run=client") | |
| subprocess.run(cmd, input=yaml_data, text=True, check=False) | |
| # ---------------------------------------------------------------------- | |
| # Main Sync Logic | |
| # ---------------------------------------------------------------------- | |
| def sync(root: Path, context: str | None, jinja_env: jinja2.Environment, dry_run: bool, project_name: str): | |
| print("\n🔍 Discovering managed resources in cluster...") | |
| old_resources = get_managed_resources(context, project_name) | |
| print(f" Found {len(old_resources)} managed resources") | |
| print(f"📂 Loading manifests from {root}...") | |
| manifests = load_manifests_from_dir(root, jinja_env) | |
| print(f" Found {len(manifests)} manifests") | |
| print("🔄 Computing diff...") | |
| to_delete, to_apply = diff_states(old_resources, manifests, project_name) | |
| if not to_delete and not to_apply: | |
| print("✅ No changes detected") | |
| return | |
| if to_delete: | |
| print(f"\n🗑️ Deleting {len(to_delete)} resources:") | |
| kubectl_delete(to_delete, context, dry_run) | |
| if to_apply: | |
| print(f"\n🚀 Applying {len(to_apply)} resources:") | |
| kubectl_apply(to_apply, context, dry_run) | |
| print(f"\n✔️ Sync completed{' (dry run)' if dry_run else ''}") | |
| def destroy(context: str | None, dry_run: bool, project_name: str): | |
| print("🔍 Discovering managed resources in cluster...") | |
| managed_resources = get_managed_resources(context, project_name) | |
| print(f" Found {len(managed_resources)} managed resources") | |
| to_delete = [res["manifest"] for res in managed_resources.values()] | |
| if not to_delete: | |
| print("✅ No managed resources found to delete") | |
| return | |
| print(f"\n🗑️ Deleting {len(to_delete)} resources:") | |
| kubectl_delete(to_delete, context, dry_run) | |
| print(f"\n✔️ Deletion completed{' (dry run)' if dry_run else ''}") | |
| # ---------------------------------------------------------------------- | |
| # CLI | |
| # ---------------------------------------------------------------------- | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Simple IaC tool for Kubernetes.") | |
| subparsers = parser.add_subparsers(dest="command", required=True, help="Sub-command to execute") | |
| parent_parser = argparse.ArgumentParser(add_help=False) | |
| parent_parser.add_argument("--kubectl-context", dest="context", help="Kubernetes context to use.") | |
| parent_parser.add_argument("--dry-run", action="store_true", help="Show what would change without applying.") | |
| parent_parser.add_argument("--directory", default=".", help="Root directory containing manifests.") | |
| parent_parser.add_argument("--project-name", help="Override project name from .config.yml or directory name.") | |
| parser_up = subparsers.add_parser("up", parents=[parent_parser], help="Apply manifests to the cluster.") | |
| parser_up.add_argument("--config-var", action="append", default=[], metavar="KEY=VALUE", | |
| help="Override or define template variable (can be used multiple times).") | |
| parser_down = subparsers.add_parser("down", parents=[parent_parser], help="Delete all managed resources from the cluster.") | |
| return parser.parse_args() | |
| # ---------------------------------------------------------------------- | |
| # Entry Point | |
| # ---------------------------------------------------------------------- | |
| def main() -> None: | |
| args = parse_args() | |
| if args.dry_run: | |
| print("⚠️ DRY-RUN MODE: No changes will be applied\n") | |
| root = Path(args.directory).resolve() | |
| if not root.exists(): | |
| raise RuntimeError(f"Directory not found: {root}") | |
| file_config = load_config_file(root) | |
| project_name = args.project_name or file_config.get("project_name") or root.name | |
| if args.command == "up": | |
| config_vars = dict(file_config.get("jinja_env", {}) or {}) | |
| for kv in args.config_var: | |
| if "=" not in kv: | |
| raise RuntimeError(f"Invalid --config-var '{kv}', must be KEY=VALUE") | |
| k, v = kv.split("=", 1) | |
| config_vars[k] = v | |
| registry_addr = config_vars.get("registry") | |
| image_dirs_exist = bool(discover_image_dirs(root)) | |
| if image_dirs_exist and not registry_addr: | |
| print("📦 Found local images to build. Setting up in-cluster registry...") | |
| kubectl_host = get_kubectl_host(args.context) | |
| ensure_insecure_registry(args.context, args.dry_run, kubectl_host) | |
| # The registry address for the push command uses the kubectl host and fixed port 5000 | |
| registry_addr = f"{kubectl_host}:5000" | |
| # Make the registry address available to Jinja templates | |
| config_vars["registry"] = registry_addr | |
| config_vars.setdefault("registry", "") | |
| image_vars = handle_local_images(root, args.context, args.dry_run, registry_addr) | |
| config_vars.update(image_vars) | |
| print(f"\n📦 Project: {project_name}") | |
| jinja_env = create_jinja_env(config_vars) | |
| sync(root, args.context, jinja_env, args.dry_run, project_name) | |
| elif args.command == "down": | |
| destroy(args.context, args.dry_run, project_name) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("\n⚠️ Interrupted by user", file=sys.stderr) | |
| sys.exit(130) | |
| except Exception as e: | |
| print(f"\n❌ Error: {e}", file=sys.stderr) | |
| sys.exit(1) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| {{ deployment( | |
| name='nginx-static', | |
| containers=[{ | |
| 'image':'nginx:stable-alpine', | |
| 'ports':[80], | |
| }], | |
| bind_mounts=[{ | |
| 'hostPath': '/storage', | |
| 'containerPath': '/usr/share/nginx/html' | |
| }] | |
| ) }} | |
| --- | |
| {{ service( | |
| name='nginx-static-svc', | |
| selector_app='nginx-static', | |
| type='NodePort', | |
| ports=[{ | |
| 'name': 'http', | |
| 'port': 80, | |
| 'targetPort': 80, | |
| 'nodePort': 30080 | |
| }] | |
| ) }} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| {{ deployment( | |
| name='python-webserver', | |
| containers=[{ | |
| 'image': images['flask-app'], | |
| 'ports':[8080] | |
| }], | |
| bind_mounts=[{ | |
| 'hostPath': '/storage', | |
| 'containerPath': '/storage' | |
| }] | |
| ) }} | |
| --- | |
| {{ service( | |
| name='python-webserver-svc', | |
| selector_app='python-webserver', | |
| type='NodePort', | |
| ports=[{ | |
| 'name': 'http', | |
| 'protocol': 'TCP', | |
| 'port': 80, | |
| 'targetPort': 8080, | |
| 'nodePort': 30180 | |
| }] | |
| ) }} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment