Created
June 28, 2026 10:50
-
-
Save ReinforceZwei/6ddcafd2a9b073d79e812f89278a1352 to your computer and use it in GitHub Desktop.
Python script to get VM/LXC list and details from Proxmox VE. Create an API token with permission to read VMs (e.g. PVEAuditor) and put it in .env file (SECRET, TOKEN_ID, PVE_ID)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import json | |
| import os | |
| import ssl | |
| import sys | |
| from pathlib import Path | |
| from urllib.error import HTTPError, URLError | |
| from urllib.parse import urlparse | |
| from urllib.request import Request, urlopen | |
| ENV_PATH = Path(__file__).with_name(".env") | |
| SSL_CONTEXT = ssl._create_unverified_context() | |
| def load_env_file(path: Path) -> None: | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Missing .env file: {path}") | |
| for raw_line in path.read_text(encoding="utf-8").splitlines(): | |
| line = raw_line.strip() | |
| if not line or line.startswith("#") or "=" not in line: | |
| continue | |
| key, value = line.split("=", 1) | |
| key = key.strip() | |
| value = value.strip().strip('"').strip("'") | |
| if key and key not in os.environ: | |
| os.environ[key] = value | |
| def require_env(name: str) -> str: | |
| value = os.environ.get(name, "").strip() | |
| if not value: | |
| raise ValueError(f"Missing required environment variable: {name}") | |
| return value | |
| def build_api_base(pve_ip: str) -> str: | |
| candidate = pve_ip if "://" in pve_ip else f"https://{pve_ip}" | |
| parsed = urlparse(candidate) | |
| scheme = parsed.scheme or "https" | |
| hostname = parsed.hostname or parsed.path | |
| port = parsed.port or 8006 | |
| return f"{scheme}://{hostname}:{port}/api2/json" | |
| def build_headers(token_id: str, secret: str) -> dict[str, str]: | |
| return { | |
| "Authorization": f"PVEAPIToken={token_id}={secret}", | |
| "Accept": "application/json", | |
| } | |
| def fetch_json(api_base: str, headers: dict[str, str], path: str) -> object: | |
| url = f"{api_base}{path}" | |
| request = Request(url, headers=headers, method="GET") | |
| # Proxmox deployments commonly use self-signed certificates. | |
| with urlopen(request, context=SSL_CONTEXT, timeout=15) as response: | |
| payload = json.load(response) | |
| data = payload.get("data") | |
| if data is None: | |
| raise ValueError(f"Unexpected API response for {path}: missing data field") | |
| return data | |
| def fetch_resources(api_base: str, headers: dict[str, str]) -> list[dict]: | |
| data = fetch_json(api_base, headers, "/cluster/resources?type=vm") | |
| if not isinstance(data, list): | |
| raise ValueError("Unexpected API response: missing data list") | |
| return data | |
| def print_resources(resources: list[dict]) -> None: | |
| instances = [item for item in resources if item.get("type") in {"qemu", "lxc"}] | |
| if not instances: | |
| print("No VMs or LXCs found.") | |
| return | |
| print(f"{'TYPE':<6} {'VMID':<6} {'NAME':<24} {'NODE':<12} STATUS") | |
| print("-" * 64) | |
| for item in sorted(instances, key=lambda entry: (entry.get("type", ""), entry.get("vmid", 0))): | |
| vm_type = str(item.get("type", "")).upper() | |
| vmid = str(item.get("vmid", "")) | |
| name = str(item.get("name") or item.get("id") or "") | |
| node = str(item.get("node", "")) | |
| status = str(item.get("status", "unknown")) | |
| print(f"{vm_type:<6} {vmid:<6} {name:<24.24} {node:<12} {status}") | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description="List Proxmox VMs/LXCs or show details for a specific VMID." | |
| ) | |
| parser.add_argument( | |
| "vmid", | |
| nargs="?", | |
| type=int, | |
| help="VMID to show details for. Omit to list all VMs/LXCs.", | |
| ) | |
| return parser.parse_args() | |
| def find_instance(resources: list[dict], vmid: int) -> dict: | |
| for item in resources: | |
| if item.get("type") in {"qemu", "lxc"} and item.get("vmid") == vmid: | |
| return item | |
| raise ValueError(f"VMID {vmid} was not found in cluster resources.") | |
| def format_bytes(value: object) -> str: | |
| if not isinstance(value, (int, float)) or value < 0: | |
| return "unknown" | |
| units = ["B", "KiB", "MiB", "GiB", "TiB"] | |
| size = float(value) | |
| unit_index = 0 | |
| while size >= 1024 and unit_index < len(units) - 1: | |
| size /= 1024 | |
| unit_index += 1 | |
| return f"{size:.1f} {units[unit_index]}" | |
| def format_percentage(value: object) -> str: | |
| if not isinstance(value, (int, float)): | |
| return "unknown" | |
| return f"{value * 100:.1f}%" | |
| def normalize_qemu_agent_data(data: object) -> list[dict]: | |
| if isinstance(data, dict): | |
| result = data.get("result") | |
| if isinstance(result, list): | |
| return [item for item in result if isinstance(item, dict)] | |
| if isinstance(data, list): | |
| return [item for item in data if isinstance(item, dict)] | |
| return [] | |
| def extract_qemu_ips(data: object) -> tuple[list[str], str | None]: | |
| interfaces = normalize_qemu_agent_data(data) | |
| ips: list[str] = [] | |
| for interface in interfaces: | |
| name = str(interface.get("name", "")) | |
| for ip_entry in interface.get("ip-addresses", []): | |
| if not isinstance(ip_entry, dict): | |
| continue | |
| ip_address = str(ip_entry.get("ip-address", "")) | |
| ip_type = str(ip_entry.get("ip-address-type", "")) | |
| if not ip_address or ip_address.startswith("127.") or ip_address == "::1": | |
| continue | |
| ips.append(f"{ip_address} ({name or ip_type or 'interface'})") | |
| return sorted(set(ips)), None if ips else "No guest IPs reported by QEMU guest agent." | |
| def extract_lxc_ips(data: object) -> tuple[list[str], str | None]: | |
| if not isinstance(data, list): | |
| return [], "Unexpected LXC interfaces response." | |
| ips: list[str] = [] | |
| for interface in data: | |
| if not isinstance(interface, dict): | |
| continue | |
| name = str(interface.get("name", "")) | |
| for field in ("inet", "inet6"): | |
| values = interface.get(field, []) | |
| if isinstance(values, str): | |
| values = [values] | |
| if not isinstance(values, list): | |
| continue | |
| for value in values: | |
| address = str(value).strip() | |
| if not address: | |
| continue | |
| address_only = address.split("/", 1)[0] | |
| if address_only.startswith("127.") or address_only == "::1": | |
| continue | |
| ips.append(f"{address} ({name or field})") | |
| return sorted(set(ips)), None if ips else "No container IPs reported by PVE." | |
| def fetch_ip_addresses( | |
| api_base: str, | |
| headers: dict[str, str], | |
| instance: dict, | |
| ) -> tuple[list[str], str | None]: | |
| node = str(instance.get("node", "")) | |
| vmid = int(instance["vmid"]) | |
| vm_type = str(instance.get("type", "")) | |
| if vm_type == "qemu": | |
| path = f"/nodes/{node}/qemu/{vmid}/agent/network-get-interfaces" | |
| extractor = extract_qemu_ips | |
| unavailable_hint = "QEMU guest agent may be disabled, missing, or not permitted by the token." | |
| elif vm_type == "lxc": | |
| path = f"/nodes/{node}/lxc/{vmid}/interfaces" | |
| extractor = extract_lxc_ips | |
| unavailable_hint = "Container interface data may require extra permissions." | |
| else: | |
| return [], f"Unsupported VM type: {vm_type}" | |
| try: | |
| data = fetch_json(api_base, headers, path) | |
| except HTTPError as error: | |
| return [], f"IP lookup failed with HTTP {error.code}: {error.reason}. {unavailable_hint}" | |
| except URLError as error: | |
| return [], f"IP lookup failed: {error.reason}" | |
| return extractor(data) | |
| def print_instance_details( | |
| api_base: str, | |
| headers: dict[str, str], | |
| instance: dict, | |
| ) -> None: | |
| node = str(instance.get("node", "")) | |
| vmid = int(instance["vmid"]) | |
| vm_type = str(instance.get("type", "")) | |
| name = str(instance.get("name") or instance.get("id") or "") | |
| current = fetch_json(api_base, headers, f"/nodes/{node}/{vm_type}/{vmid}/status/current") | |
| config = fetch_json(api_base, headers, f"/nodes/{node}/{vm_type}/{vmid}/config") | |
| if not isinstance(current, dict): | |
| raise ValueError("Unexpected API response: status/current did not return an object") | |
| if not isinstance(config, dict): | |
| raise ValueError("Unexpected API response: config did not return an object") | |
| ip_addresses, ip_note = fetch_ip_addresses(api_base, headers, instance) | |
| configured_cpu = config.get("cores") or config.get("cpulimit") or current.get("cpus") | |
| configured_ram_mib = config.get("memory") | |
| current_mem = current.get("mem") | |
| max_mem = current.get("maxmem") | |
| print(f"Type: {vm_type.upper()}") | |
| print(f"VMID: {vmid}") | |
| print(f"Name: {name}") | |
| print(f"Node: {node}") | |
| print(f"Status: {current.get('status', 'unknown')}") | |
| print(f"CPU: {configured_cpu} cores") | |
| print(f"CPU usage: {format_percentage(current.get('cpu'))}") | |
| print(f"RAM configured: {configured_ram_mib} MiB") | |
| print(f"RAM usage: {format_bytes(current_mem)} / {format_bytes(max_mem)}") | |
| if ip_addresses: | |
| print("IP addresses:") | |
| for address in ip_addresses: | |
| print(f" - {address}") | |
| else: | |
| print(f"IP addresses: {ip_note}") | |
| def main() -> int: | |
| try: | |
| args = parse_args() | |
| load_env_file(ENV_PATH) | |
| secret = require_env("SECRET") | |
| token_id = require_env("TOKEN_ID") | |
| pve_ip = require_env("PVE_IP") | |
| api_base = build_api_base(pve_ip) | |
| headers = build_headers(token_id, secret) | |
| resources = fetch_resources(api_base, headers) | |
| if args.vmid is None: | |
| print_resources(resources) | |
| else: | |
| instance = find_instance(resources, args.vmid) | |
| print_instance_details(api_base, headers, instance) | |
| return 0 | |
| except FileNotFoundError as error: | |
| print(error, file=sys.stderr) | |
| except ValueError as error: | |
| print(error, file=sys.stderr) | |
| except HTTPError as error: | |
| print(f"HTTP error {error.code}: {error.reason}", file=sys.stderr) | |
| except URLError as error: | |
| print(f"Connection error: {error.reason}", file=sys.stderr) | |
| except TimeoutError: | |
| print("Connection timed out.", file=sys.stderr) | |
| except Exception as error: | |
| print(f"Unexpected error: {error}", file=sys.stderr) | |
| return 1 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment