Skip to content

Instantly share code, notes, and snippets.

@ReinforceZwei
Created June 28, 2026 10:50
Show Gist options
  • Select an option

  • Save ReinforceZwei/6ddcafd2a9b073d79e812f89278a1352 to your computer and use it in GitHub Desktop.

Select an option

Save ReinforceZwei/6ddcafd2a9b073d79e812f89278a1352 to your computer and use it in GitHub Desktop.
Python script to get VM/LXC list and details from Proxmox VE. Create an API token with permission to read VMs (e.g. PVEAuditor) and put it in .env file (SECRET, TOKEN_ID, PVE_ID)
import argparse
import json
import os
import ssl
import sys
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
from urllib.request import Request, urlopen
ENV_PATH = Path(__file__).with_name(".env")
SSL_CONTEXT = ssl._create_unverified_context()
def load_env_file(path: Path) -> None:
if not path.exists():
raise FileNotFoundError(f"Missing .env file: {path}")
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def require_env(name: str) -> str:
value = os.environ.get(name, "").strip()
if not value:
raise ValueError(f"Missing required environment variable: {name}")
return value
def build_api_base(pve_ip: str) -> str:
candidate = pve_ip if "://" in pve_ip else f"https://{pve_ip}"
parsed = urlparse(candidate)
scheme = parsed.scheme or "https"
hostname = parsed.hostname or parsed.path
port = parsed.port or 8006
return f"{scheme}://{hostname}:{port}/api2/json"
def build_headers(token_id: str, secret: str) -> dict[str, str]:
return {
"Authorization": f"PVEAPIToken={token_id}={secret}",
"Accept": "application/json",
}
def fetch_json(api_base: str, headers: dict[str, str], path: str) -> object:
url = f"{api_base}{path}"
request = Request(url, headers=headers, method="GET")
# Proxmox deployments commonly use self-signed certificates.
with urlopen(request, context=SSL_CONTEXT, timeout=15) as response:
payload = json.load(response)
data = payload.get("data")
if data is None:
raise ValueError(f"Unexpected API response for {path}: missing data field")
return data
def fetch_resources(api_base: str, headers: dict[str, str]) -> list[dict]:
data = fetch_json(api_base, headers, "/cluster/resources?type=vm")
if not isinstance(data, list):
raise ValueError("Unexpected API response: missing data list")
return data
def print_resources(resources: list[dict]) -> None:
instances = [item for item in resources if item.get("type") in {"qemu", "lxc"}]
if not instances:
print("No VMs or LXCs found.")
return
print(f"{'TYPE':<6} {'VMID':<6} {'NAME':<24} {'NODE':<12} STATUS")
print("-" * 64)
for item in sorted(instances, key=lambda entry: (entry.get("type", ""), entry.get("vmid", 0))):
vm_type = str(item.get("type", "")).upper()
vmid = str(item.get("vmid", ""))
name = str(item.get("name") or item.get("id") or "")
node = str(item.get("node", ""))
status = str(item.get("status", "unknown"))
print(f"{vm_type:<6} {vmid:<6} {name:<24.24} {node:<12} {status}")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="List Proxmox VMs/LXCs or show details for a specific VMID."
)
parser.add_argument(
"vmid",
nargs="?",
type=int,
help="VMID to show details for. Omit to list all VMs/LXCs.",
)
return parser.parse_args()
def find_instance(resources: list[dict], vmid: int) -> dict:
for item in resources:
if item.get("type") in {"qemu", "lxc"} and item.get("vmid") == vmid:
return item
raise ValueError(f"VMID {vmid} was not found in cluster resources.")
def format_bytes(value: object) -> str:
if not isinstance(value, (int, float)) or value < 0:
return "unknown"
units = ["B", "KiB", "MiB", "GiB", "TiB"]
size = float(value)
unit_index = 0
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
return f"{size:.1f} {units[unit_index]}"
def format_percentage(value: object) -> str:
if not isinstance(value, (int, float)):
return "unknown"
return f"{value * 100:.1f}%"
def normalize_qemu_agent_data(data: object) -> list[dict]:
if isinstance(data, dict):
result = data.get("result")
if isinstance(result, list):
return [item for item in result if isinstance(item, dict)]
if isinstance(data, list):
return [item for item in data if isinstance(item, dict)]
return []
def extract_qemu_ips(data: object) -> tuple[list[str], str | None]:
interfaces = normalize_qemu_agent_data(data)
ips: list[str] = []
for interface in interfaces:
name = str(interface.get("name", ""))
for ip_entry in interface.get("ip-addresses", []):
if not isinstance(ip_entry, dict):
continue
ip_address = str(ip_entry.get("ip-address", ""))
ip_type = str(ip_entry.get("ip-address-type", ""))
if not ip_address or ip_address.startswith("127.") or ip_address == "::1":
continue
ips.append(f"{ip_address} ({name or ip_type or 'interface'})")
return sorted(set(ips)), None if ips else "No guest IPs reported by QEMU guest agent."
def extract_lxc_ips(data: object) -> tuple[list[str], str | None]:
if not isinstance(data, list):
return [], "Unexpected LXC interfaces response."
ips: list[str] = []
for interface in data:
if not isinstance(interface, dict):
continue
name = str(interface.get("name", ""))
for field in ("inet", "inet6"):
values = interface.get(field, [])
if isinstance(values, str):
values = [values]
if not isinstance(values, list):
continue
for value in values:
address = str(value).strip()
if not address:
continue
address_only = address.split("/", 1)[0]
if address_only.startswith("127.") or address_only == "::1":
continue
ips.append(f"{address} ({name or field})")
return sorted(set(ips)), None if ips else "No container IPs reported by PVE."
def fetch_ip_addresses(
api_base: str,
headers: dict[str, str],
instance: dict,
) -> tuple[list[str], str | None]:
node = str(instance.get("node", ""))
vmid = int(instance["vmid"])
vm_type = str(instance.get("type", ""))
if vm_type == "qemu":
path = f"/nodes/{node}/qemu/{vmid}/agent/network-get-interfaces"
extractor = extract_qemu_ips
unavailable_hint = "QEMU guest agent may be disabled, missing, or not permitted by the token."
elif vm_type == "lxc":
path = f"/nodes/{node}/lxc/{vmid}/interfaces"
extractor = extract_lxc_ips
unavailable_hint = "Container interface data may require extra permissions."
else:
return [], f"Unsupported VM type: {vm_type}"
try:
data = fetch_json(api_base, headers, path)
except HTTPError as error:
return [], f"IP lookup failed with HTTP {error.code}: {error.reason}. {unavailable_hint}"
except URLError as error:
return [], f"IP lookup failed: {error.reason}"
return extractor(data)
def print_instance_details(
api_base: str,
headers: dict[str, str],
instance: dict,
) -> None:
node = str(instance.get("node", ""))
vmid = int(instance["vmid"])
vm_type = str(instance.get("type", ""))
name = str(instance.get("name") or instance.get("id") or "")
current = fetch_json(api_base, headers, f"/nodes/{node}/{vm_type}/{vmid}/status/current")
config = fetch_json(api_base, headers, f"/nodes/{node}/{vm_type}/{vmid}/config")
if not isinstance(current, dict):
raise ValueError("Unexpected API response: status/current did not return an object")
if not isinstance(config, dict):
raise ValueError("Unexpected API response: config did not return an object")
ip_addresses, ip_note = fetch_ip_addresses(api_base, headers, instance)
configured_cpu = config.get("cores") or config.get("cpulimit") or current.get("cpus")
configured_ram_mib = config.get("memory")
current_mem = current.get("mem")
max_mem = current.get("maxmem")
print(f"Type: {vm_type.upper()}")
print(f"VMID: {vmid}")
print(f"Name: {name}")
print(f"Node: {node}")
print(f"Status: {current.get('status', 'unknown')}")
print(f"CPU: {configured_cpu} cores")
print(f"CPU usage: {format_percentage(current.get('cpu'))}")
print(f"RAM configured: {configured_ram_mib} MiB")
print(f"RAM usage: {format_bytes(current_mem)} / {format_bytes(max_mem)}")
if ip_addresses:
print("IP addresses:")
for address in ip_addresses:
print(f" - {address}")
else:
print(f"IP addresses: {ip_note}")
def main() -> int:
try:
args = parse_args()
load_env_file(ENV_PATH)
secret = require_env("SECRET")
token_id = require_env("TOKEN_ID")
pve_ip = require_env("PVE_IP")
api_base = build_api_base(pve_ip)
headers = build_headers(token_id, secret)
resources = fetch_resources(api_base, headers)
if args.vmid is None:
print_resources(resources)
else:
instance = find_instance(resources, args.vmid)
print_instance_details(api_base, headers, instance)
return 0
except FileNotFoundError as error:
print(error, file=sys.stderr)
except ValueError as error:
print(error, file=sys.stderr)
except HTTPError as error:
print(f"HTTP error {error.code}: {error.reason}", file=sys.stderr)
except URLError as error:
print(f"Connection error: {error.reason}", file=sys.stderr)
except TimeoutError:
print("Connection timed out.", file=sys.stderr)
except Exception as error:
print(f"Unexpected error: {error}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment