Created
August 29, 2025 16:00
-
-
Save outwitevil/44d2f0291be04e1ab4d279f801587e8f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| OpenStack Projects Yaml Manager | |
| This script provides a text‑based UI for creating or editing project | |
| entries inside the various OpenStack project definition YAML files. | |
| It performs a `git pull` before editing, commits the change, and offers | |
| to push the changes. | |
| Author: <your‑name> | |
| Date: 2025‑08‑29 | |
| """ | |
| import os | |
| import re | |
| import sys | |
| import pathlib | |
| import yaml | |
| from typing import Dict, Any, List | |
| import questionary | |
| from questionary import Style | |
| from git import Repo, GitCommandError | |
| # --------------------------------------------------------------------------- # | |
| # Configuration | |
| # --------------------------------------------------------------------------- # | |
| # Directory that contains the YAML files (assumed to be the git repo root) | |
| REPO_ROOT = pathlib.Path.cwd() | |
| # Mapping from file name to environment info | |
| ENV_MAP = { | |
| "projects-ent-dev.yml": { | |
| "env": "dev", | |
| "region": "ent-dev", | |
| "prefixes": ["DEV-"], | |
| }, | |
| "projects-ent-lab.yml": { | |
| "env": "lab", | |
| "region": "ent-lab", | |
| "prefixes": ["DEV-"], # lab uses the same DEV- prefix | |
| }, | |
| "projects-ent-qa.yml": { | |
| "env": "qa", | |
| "region": "ent-qa", | |
| "prefixes": ["EQ-", "QA-", "PS-", "OS-"], | |
| }, | |
| "projects-ent-prod.yml": { | |
| "env": "prod", | |
| "region": "ent-prod", | |
| "prefixes": ["PROD-", "OTE-"], | |
| }, | |
| "projects-ent-core-nonprod.yml": { | |
| "env": "core-nonprod", | |
| "region": "ent-core-nonprod", | |
| "prefixes": ["CORE-"], # not specified, but assume CORE- | |
| }, | |
| "projects-ent-core-prod.yml": { | |
| "env": "core-prod", | |
| "region": "ent-core-prod", | |
| "prefixes": ["CORE-"], | |
| }, | |
| } | |
| # YAML constants | |
| DEFAULT_DOMAIN = "AUTH.EXAMPLE.COM" | |
| DEFAULT_STATE = "present" | |
| # Quota defaults | |
| QUOTA_DEFAULTS = { | |
| "volumes": 10, | |
| "gigabytes": 10, | |
| "security_group_rule": 1000, | |
| } | |
| # --------------------------------------------------------------------------- # | |
| # Helper functions | |
| # --------------------------------------------------------------------------- # | |
| def git_pull(repo: Repo) -> None: | |
| """Pull latest changes from origin/master (or main).""" | |
| try: | |
| repo.remotes.origin.pull() | |
| except GitCommandError as exc: | |
| print(f"⚠️ Git pull failed: {exc}") | |
| sys.exit(1) | |
| def git_commit(repo: Repo, message: str, files: List[pathlib.Path]) -> None: | |
| """Stage given files and commit with the supplied message.""" | |
| repo.index.add([str(f) for f in files]) | |
| repo.index.commit(message) | |
| def git_push(repo: Repo) -> None: | |
| """Push committed changes to origin.""" | |
| try: | |
| repo.remotes.origin.push() | |
| except GitCommandError as exc: | |
| print(f"⚠️ Git push failed: {exc}") | |
| sys.exit(1) | |
| def load_yaml(path: pathlib.Path) -> List[Dict[str, Any]]: | |
| """Load the YAML file – returns a list of dicts.""" | |
| with path.open("r", encoding="utf-8") as f: | |
| data = yaml.safe_load(f) | |
| return data or [] | |
| def save_yaml(path: pathlib.Path, data: List[Dict[str, Any]]) -> None: | |
| """Write the YAML data back to file.""" | |
| with path.open("w", encoding="utf-8") as f: | |
| yaml.safe_dump(data, f, default_flow_style=False, sort_keys=False) | |
| def find_entry(data: List[Dict[str, Any]], name: str) -> Dict[str, Any]: | |
| """Return the entry dict for the given name (root key).""" | |
| for entry in data: | |
| if name in entry: | |
| return entry[name] | |
| return {} | |
| def entry_root_key(entry: Dict[str, Any]) -> str: | |
| """Return the root key (the project name) of an entry dict.""" | |
| if not entry: | |
| return "" | |
| return next(iter(entry.keys())) | |
| def validate_prefix(name: str, prefixes: List[str]) -> bool: | |
| return any(name.startswith(p) for p in prefixes) | |
| def validate_ticket(ticket: str) -> bool: | |
| return bool(re.match(r"^OSP-\d+$", ticket)) | |
| def validate_prdcode(prdcode: str) -> bool: | |
| return bool(re.match(r"^prd.+$", prdcode, re.IGNORECASE)) | |
| def int_input(prompt_text: str, default: int = None) -> int: | |
| """Prompt for an integer with an optional default.""" | |
| while True: | |
| val = questionary.text( | |
| f"{prompt_text} [{default}]" if default is not None else prompt_text | |
| ).ask() | |
| if val == "" and default is not None: | |
| return default | |
| if val.isdigit() and int(val) >= 0: | |
| return int(val) | |
| print("❌ Please enter a non‑negative integer.") | |
| def yes_no(prompt_text: str) -> bool: | |
| return questionary.confirm(prompt_text, default=False).ask() | |
| # --------------------------------------------------------------------------- # | |
| # Main interactive flow | |
| # --------------------------------------------------------------------------- # | |
| def main() -> None: | |
| # Find the repo | |
| try: | |
| repo = Repo(REPO_ROOT) | |
| except Exception: | |
| print("❌ Current directory is not a git repository.") | |
| sys.exit(1) | |
| # 1. Choose a file | |
| file_choices = list(ENV_MAP.keys()) | |
| chosen_file = questionary.select( | |
| "Select the environment file you want to edit:", | |
| choices=file_choices, | |
| ).ask() | |
| if chosen_file is None: | |
| sys.exit(0) | |
| file_path = REPO_ROOT / chosen_file | |
| env_info = ENV_MAP[chosen_file] | |
| env_prefixes = env_info["prefixes"] | |
| region = env_info["region"] | |
| # 2. Pull latest changes | |
| print("\n🔄 Pulling latest changes from git…") | |
| git_pull(repo) | |
| # 3. Load existing data | |
| data = load_yaml(file_path) | |
| # 4. Action: create or modify | |
| action = questionary.select( | |
| "What would you like to do?", | |
| choices=["Create a new entry", "Modify an existing entry"], | |
| ).ask() | |
| if action is None: | |
| sys.exit(0) | |
| if action == "Create a new entry": | |
| # Name | |
| while True: | |
| name = questionary.text("Enter the project name:").ask() | |
| if not name: | |
| print("❌ Name is required.") | |
| continue | |
| if any(name in entry for entry in data): | |
| print("❌ Name already exists in this file.") | |
| continue | |
| if not validate_prefix(name, env_prefixes): | |
| allowed = ", ".join(env_prefixes) | |
| print(f"❌ Name must start with one of: {allowed}") | |
| continue | |
| break | |
| # Owner | |
| owner = questionary.text("Owner (username):").ask() | |
| # Ticket | |
| while True: | |
| ticket = questionary.text("Ticket (OSP-xxxxx):").ask() | |
| if validate_ticket(ticket): | |
| break | |
| print("❌ Ticket must match pattern OSP-<numbers>.") | |
| # PRD code | |
| while True: | |
| prdcode = questionary.text("PRD code (starts with prd):").ask() | |
| if validate_prdcode(prdcode): | |
| break | |
| print("❌ PRD code must start with 'prd'.") | |
| # Users (optional) | |
| users_list = [] | |
| if yes_no("Add users?"): | |
| print("Type 'done' when you are finished.") | |
| while True: | |
| user = questionary.text("User name:").ask() | |
| if user.lower() == "done": | |
| break | |
| if user: | |
| users_list.append({"name": user}) | |
| else: | |
| print("❌ User name cannot be empty.") | |
| # Groups (required, at least one) | |
| groups_list = [] | |
| print("Define at least one group.") | |
| while True: | |
| group = questionary.text("Group name:").ask() | |
| if group: | |
| groups_list.append({"name": group}) | |
| if yes_no("Add another group?"): | |
| continue | |
| else: | |
| break | |
| else: | |
| print("❌ Group name cannot be empty.") | |
| # Quotas | |
| print("\n--- Quota Settings ---") | |
| quotas = {} | |
| quotas["cores"] = int_input("Cores", default=None) | |
| quotas["ram"] = int_input("RAM (MiB)", default=None) | |
| quotas["instances"] = int_input("Instances", default=None) | |
| quotas["volumes"] = int_input("Volumes", default=QUOTA_DEFAULTS["volumes"]) | |
| quotas["gigabytes"] = int_input("Gigabytes", default=QUOTA_DEFAULTS["gigabytes"]) | |
| quotas["security_group_rule"] = int_input( | |
| "Security Group Rule", default=QUOTA_DEFAULTS["security_group_rule"] | |
| ) | |
| if yes_no("Add optional 'security_group' quota?"): | |
| quotas["security_group"] = int_input("Security Group", default=None) | |
| # Assemble entry dict | |
| entry_dict = { | |
| "regions": region, | |
| "state": DEFAULT_STATE, | |
| "domain": DEFAULT_DOMAIN, | |
| "owner": owner, | |
| "ticket": ticket, | |
| "prdcode": prdcode, | |
| } | |
| # Optional keys | |
| if users_list: | |
| entry_dict["users"] = users_list | |
| entry_dict["groups"] = groups_list | |
| entry_dict["quotas"] = quotas | |
| # Optional admin_override – only if user wants to set it | |
| if yes_no("Add 'admin_override: disabled' ?"): | |
| entry_dict["admin_override"] = "disabled" | |
| # Wrap into root key dict | |
| new_entry = {name: entry_dict} | |
| # Append to data list | |
| data.append(new_entry) | |
| commit_msg = ticket | |
| else: # Modify existing | |
| # List existing names | |
| names = [entry_root_key(entry) for entry in data] | |
| if not names: | |
| print("⚠️ No entries to modify.") | |
| sys.exit(0) | |
| mod_name = questionary.select( | |
| "Select the project you want to modify:", | |
| choices=names, | |
| ).ask() | |
| if mod_name is None: | |
| sys.exit(0) | |
| # Find the entry | |
| entry_index = next( | |
| i for i, e in enumerate(data) if mod_name in e | |
| ) | |
| entry_dict = data[entry_index][mod_name] | |
| print(f"\nEditing entry: {mod_name}") | |
| # Owner | |
| owner = questionary.text( | |
| f"Owner (username) [{entry_dict['owner']}]", | |
| default=entry_dict['owner'], | |
| ).ask() | |
| entry_dict["owner"] = owner | |
| # Ticket | |
| while True: | |
| ticket = questionary.text( | |
| f"Ticket (OSP-xxxxx) [{entry_dict['ticket']}]", | |
| default=entry_dict["ticket"], | |
| ).ask() | |
| if validate_ticket(ticket): | |
| break | |
| print("❌ Ticket must match pattern OSP-<numbers>.") | |
| entry_dict["ticket"] = ticket | |
| # PRD code | |
| while True: | |
| prdcode = questionary.text( | |
| f"PRD code (starts with prd) [{entry_dict['prdcode']}]", | |
| default=entry_dict["prdcode"], | |
| ).ask() | |
| if validate_prdcode(prdcode): | |
| break | |
| print("❌ PRD code must start with 'prd'.") | |
| entry_dict["prdcode"] = prdcode | |
| # Users | |
| if "users" in entry_dict: | |
| print("Current users:") | |
| for u in entry_dict["users"]: | |
| print(f" - {u['name']}") | |
| else: | |
| print("No users currently set.") | |
| if yes_no("Modify users list?"): | |
| users_list = [] | |
| print("Type 'done' when you are finished.") | |
| while True: | |
| user = questionary.text("User name:").ask() | |
| if user.lower() == "done": | |
| break | |
| if user: | |
| users_list.append({"name": user}) | |
| else: | |
| print("❌ User name cannot be empty.") | |
| entry_dict["users"] = users_list | |
| # Groups | |
| print("\nCurrent groups:") | |
| for g in entry_dict["groups"]: | |
| print(f" - {g['name']}") | |
| if yes_no("Modify groups?"): | |
| groups_list = [] | |
| print("Define at least one group.") | |
| while True: | |
| group = questionary.text("Group name:").ask() | |
| if group: | |
| groups_list.append({"name": group}) | |
| if yes_no("Add another group?"): | |
| continue | |
| else: | |
| break | |
| else: | |
| print("❌ Group name cannot be empty.") | |
| entry_dict["groups"] = groups_list | |
| # Quotas | |
| print("\n--- Quota Settings (current values shown) ---") | |
| quotas = entry_dict["quotas"] | |
| quotas["cores"] = int_input( | |
| f"Cores [{quotas['cores']}]", default=quotas["cores"] | |
| ) | |
| quotas["ram"] = int_input( | |
| f"RAM (MiB) [{quotas['ram']}]", default=quotas["ram"] | |
| ) | |
| quotas["instances"] = int_input( | |
| f"Instances [{quotas['instances']}]", default=quotas["instances"] | |
| ) | |
| quotas["volumes"] = int_input( | |
| f"Volumes [{quotas.get('volumes', QUOTA_DEFAULTS['volumes'])}]", | |
| default=quotas.get("volumes", QUOTA_DEFAULTS["volumes"]), | |
| ) | |
| quotas["gigabytes"] = int_input( | |
| f"Gigabytes [{quotas.get('gigabytes', QUOTA_DEFAULTS['gigabytes'])}]", | |
| default=quotas.get("gigabytes", QUOTA_DEFAULTS["gigabytes"]), | |
| ) | |
| quotas["security_group_rule"] = int_input( | |
| f"Security Group Rule [{quotas.get('security_group_rule', QUOTA_DEFAULTS['security_group_rule'])}]", | |
| default=quotas.get("security_group_rule", QUOTA_DEFAULTS["security_group_rule"]), | |
| ) | |
| if "security_group" in quotas: | |
| quotas["security_group"] = int_input( | |
| f"Security Group [{quotas['security_group']}]", default=quotas["security_group"] | |
| ) | |
| else: | |
| if yes_no("Add optional 'security_group' quota?"): | |
| quotas["security_group"] = int_input("Security Group", default=None) | |
| # Optional admin_override | |
| if "admin_override" in entry_dict: | |
| print(f"Current admin_override: {entry_dict['admin_override']}") | |
| if yes_no("Add or change 'admin_override: disabled'?"): | |
| entry_dict["admin_override"] = "disabled" | |
| # Put back into data | |
| data[entry_index] = {mod_name: entry_dict} | |
| commit_msg = ticket | |
| # 5. Write back YAML | |
| save_yaml(file_path, data) | |
| print("\n✅ YAML file updated.") | |
| # 6. Commit | |
| print("\n📦 Committing changes...") | |
| git_commit(repo, commit_msg, [file_path]) | |
| # 7. Prompt to push | |
| if yes_no("Push the commit to origin?"): | |
| git_push(repo) | |
| print("✅ Push succeeded.") | |
| else: | |
| print("🚫 Push skipped – you can push later.") | |
| print("\n🎉 Done!") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment