Created
March 24, 2026 14:00
-
-
Save rkok/86a303f0e1f1eedfe821c90c315d42f2 to your computer and use it in GitHub Desktop.
Uptime Kuma "API" CLI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| uptime-kuma-cli.py — Manage Uptime Kuma monitors via direct SQLite DB manipulation. | |
| Workflow: stop container → write DB → start container. | |
| Reads (--list, --export) do NOT stop/start the container — DB is opened read-only. | |
| Usage: | |
| python3 uptime-kuma-cli.py --list | |
| python3 uptime-kuma-cli.py --add '{"name": "[vps01] foo.nl", "url": "https://foo.nl"}' | |
| python3 uptime-kuma-cli.py --batch-add '[{"name": "[vps01] foo.nl", "url": "https://foo.nl"}, ...]' | |
| python3 uptime-kuma-cli.py --delete <monitorID> | |
| python3 uptime-kuma-cli.py --delete-name "[vps01] foo.nl" | |
| python3 uptime-kuma-cli.py --batch-delete '[<id>, <id>, ...]' | |
| python3 uptime-kuma-cli.py --pause <monitorID> | |
| python3 uptime-kuma-cli.py --resume <monitorID> | |
| python3 uptime-kuma-cli.py --edit <monitorID> --json '{"name": "New Name"}' | |
| python3 uptime-kuma-cli.py --export # dump all monitors as JSON | |
| python3 uptime-kuma-cli.py --import file.json # import monitors from JSON file | |
| """ | |
| import sys | |
| import json | |
| import os | |
| import argparse | |
| import logging | |
| import sqlite3 | |
| import subprocess | |
| import time | |
| # --------------------------------------------------------------------------- | |
| # Logging | |
| # --------------------------------------------------------------------------- | |
| log = logging.getLogger("kuma-cli") | |
| log.setLevel(logging.DEBUG) | |
| _handler = logging.StreamHandler(sys.stderr) | |
| _handler.setFormatter(logging.Formatter("[%(asctime)s.%(msecs)03d] %(levelname)s %(message)s", datefmt="%H:%M:%S")) | |
| log.addHandler(_handler) | |
| # --------------------------------------------------------------------------- | |
| # Config | |
| # --------------------------------------------------------------------------- | |
| KUMA_DB = "/var/lib/docker/volumes/uptime-kuma_uptime-kuma-data/_data/kuma.db" | |
| CONTAINER_NAME = "uptime-kuma" | |
| DEFAULT_USER_ID = 1 # Uptime Kuma's default user ID | |
| # --------------------------------------------------------------------------- | |
| # Docker helpers | |
| # --------------------------------------------------------------------------- | |
| def docker_stop(): | |
| log.debug(f"Stopping container '{CONTAINER_NAME}'...") | |
| result = subprocess.run( | |
| ["sudo", "/usr/bin/docker", "stop", CONTAINER_NAME], | |
| capture_output=True, text=True | |
| ) | |
| if result.returncode != 0: | |
| print(f"ERROR: Failed to stop container: {result.stderr.strip()}", file=sys.stderr) | |
| sys.exit(1) | |
| # Brief pause to ensure DB is fully flushed | |
| time.sleep(1) | |
| log.debug("Container stopped.") | |
| def docker_start(): | |
| log.debug(f"Starting container '{CONTAINER_NAME}'...") | |
| result = subprocess.run( | |
| ["sudo", "/usr/bin/docker", "start", CONTAINER_NAME], | |
| capture_output=True, text=True | |
| ) | |
| if result.returncode != 0: | |
| print(f"ERROR: Failed to start container: {result.stderr.strip()}", file=sys.stderr) | |
| sys.exit(1) | |
| log.debug("Container started.") | |
| class ContainerPaused: | |
| """Context manager: stops container on enter, starts on exit (even on error).""" | |
| def __enter__(self): | |
| docker_stop() | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| docker_start() | |
| return False # don't suppress exceptions | |
| # --------------------------------------------------------------------------- | |
| # DB helpers | |
| # --------------------------------------------------------------------------- | |
| def open_db(readonly=False): | |
| if readonly: | |
| conn = sqlite3.connect(f"file:{KUMA_DB}?mode=ro", uri=True) | |
| else: | |
| conn = sqlite3.connect(KUMA_DB) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| return conn | |
| def get_all_monitors(conn): | |
| cur = conn.execute( | |
| "SELECT id, name, url, active, type, interval FROM monitor ORDER BY name" | |
| ) | |
| return [dict(row) for row in cur.fetchall()] | |
| # --------------------------------------------------------------------------- | |
| # Monitor payload builder | |
| # --------------------------------------------------------------------------- | |
| def build_monitor_row(payload): | |
| """Map user-supplied dict to DB column values.""" | |
| monitor_type = payload.get("type", "http").lower() | |
| type_map = {"https": "http", "tcp": "port"} | |
| monitor_type = type_map.get(monitor_type, monitor_type) | |
| accepted = payload.get("accepted_statuscodes", ["200-299"]) | |
| return { | |
| "name": payload["name"], | |
| "active": int(payload.get("active", True)), | |
| "user_id": payload.get("user_id", DEFAULT_USER_ID), | |
| "interval": payload.get("interval", 60), | |
| "url": payload.get("url", ""), | |
| "type": monitor_type, | |
| "maxretries": payload.get("maxretries", 3), | |
| "ignore_tls": int(payload.get("ignoreTls", False)), | |
| "upside_down": int(payload.get("upsideDown", False)), | |
| "maxredirects": payload.get("maxredirects", 10), | |
| "accepted_statuscodes_json": json.dumps(accepted), | |
| "dns_resolve_type": payload.get("dns_resolve_type", "A"), | |
| "dns_resolve_server": payload.get("dns_resolve_server", "1.1.1.1"), | |
| "retry_interval": payload.get("retryInterval", 60), | |
| "resend_interval": payload.get("resendInterval", 0), | |
| "method": payload.get("method", "GET"), | |
| "hostname": payload.get("hostname"), | |
| "port": payload.get("port"), | |
| "keyword": payload.get("keyword"), | |
| "body": payload.get("body"), | |
| "headers": payload.get("headers"), | |
| "basic_auth_user": payload.get("basic_auth_user"), | |
| "basic_auth_pass": payload.get("basic_auth_pass"), | |
| "description": payload.get("description"), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Commands | |
| # --------------------------------------------------------------------------- | |
| def cmd_list(args): | |
| """List all monitors (read-only, container stays up).""" | |
| conn = open_db(readonly=True) | |
| monitors = get_all_monitors(conn) | |
| conn.close() | |
| out = [ | |
| {"id": m["id"], "name": m["name"], "url": m["url"], "active": bool(m["active"])} | |
| for m in monitors | |
| ] | |
| print(json.dumps(out)) | |
| def cmd_export(args): | |
| """Export all monitors as full JSON (read-only).""" | |
| conn = open_db(readonly=True) | |
| cur = conn.execute("SELECT * FROM monitor ORDER BY id") | |
| rows = [dict(row) for row in cur.fetchall()] | |
| conn.close() | |
| print(json.dumps(rows, indent=2)) | |
| def cmd_add(args): | |
| raw = args.json or sys.stdin.read().strip() | |
| if not raw: | |
| print("ERROR: No input provided", file=sys.stderr) | |
| sys.exit(1) | |
| try: | |
| payload = json.loads(raw) | |
| except json.JSONDecodeError as e: | |
| print(f"ERROR: Invalid JSON: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| if not payload.get("name"): | |
| print("ERROR: 'name' is required", file=sys.stderr) | |
| sys.exit(1) | |
| row = build_monitor_row(payload) | |
| columns = [k for k, v in row.items() if v is not None] | |
| placeholders = ", ".join("?" for _ in columns) | |
| values = [row[k] for k in columns] | |
| sql = f"INSERT INTO monitor ({', '.join(columns)}) VALUES ({placeholders})" | |
| with ContainerPaused(): | |
| conn = open_db(readonly=False) | |
| cur = conn.execute(sql, values) | |
| new_id = cur.lastrowid | |
| conn.commit() | |
| conn.close() | |
| out = {"ok": True, "monitorID": new_id, "name": row["name"], "url": row.get("url", "")} | |
| print(json.dumps(out)) | |
| def cmd_batch_add(args): | |
| """Add multiple monitors in one container stop/start cycle.""" | |
| raw = args.json or sys.stdin.read().strip() | |
| if not raw: | |
| print("ERROR: No input provided", file=sys.stderr) | |
| sys.exit(1) | |
| try: | |
| monitors = json.loads(raw) | |
| except json.JSONDecodeError as e: | |
| print(f"ERROR: Invalid JSON: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| if not isinstance(monitors, list) or not monitors: | |
| print("ERROR: Expected a non-empty JSON array", file=sys.stderr) | |
| sys.exit(1) | |
| results = [] | |
| added = 0 | |
| failed = 0 | |
| with ContainerPaused(): | |
| conn = open_db(readonly=False) | |
| for m in monitors: | |
| if not m.get("name"): | |
| log.warning(f"Skipping entry without 'name': {m}") | |
| failed += 1 | |
| results.append({"name": m.get("name", "?"), "ok": False, "error": "missing name"}) | |
| continue | |
| try: | |
| row = build_monitor_row(m) | |
| columns = [k for k, v in row.items() if v is not None] | |
| placeholders = ", ".join("?" for _ in columns) | |
| values = [row[k] for k in columns] | |
| sql = f"INSERT INTO monitor ({', '.join(columns)}) VALUES ({placeholders})" | |
| cur = conn.execute(sql, values) | |
| new_id = cur.lastrowid | |
| results.append({"name": m["name"], "url": m.get("url", ""), "ok": True, "monitorID": new_id}) | |
| added += 1 | |
| log.debug(f"Inserted '{m['name']}' → id={new_id}") | |
| except Exception as e: | |
| log.error(f"Failed to insert '{m.get('name', '?')}': {e}") | |
| failed += 1 | |
| results.append({"name": m.get("name", "?"), "url": m.get("url", ""), "ok": False, "error": str(e)}) | |
| conn.commit() | |
| conn.close() | |
| out = {"ok": failed == 0, "added": added, "failed": failed, "results": results} | |
| print(json.dumps(out)) | |
| if failed > 0: | |
| sys.exit(1) | |
| def cmd_delete(args): | |
| monitor_id = int(args.id) | |
| with ContainerPaused(): | |
| conn = open_db(readonly=False) | |
| # Also clean up related tables | |
| conn.execute("DELETE FROM monitor_notification WHERE monitor_id = ?", (monitor_id,)) | |
| conn.execute("DELETE FROM monitor_tag WHERE monitor_id = ?", (monitor_id,)) | |
| conn.execute("DELETE FROM monitor_group WHERE monitor_id = ?", (monitor_id,)) | |
| conn.execute("DELETE FROM heartbeat WHERE monitor_id = ?", (monitor_id,)) | |
| conn.execute("DELETE FROM monitor WHERE id = ?", (monitor_id,)) | |
| conn.commit() | |
| conn.close() | |
| print(json.dumps({"ok": True, "deleted": monitor_id})) | |
| def cmd_delete_name(args): | |
| """Delete a monitor by exact name (useful when ID is unknown).""" | |
| name = args.name | |
| with ContainerPaused(): | |
| conn = open_db(readonly=False) | |
| cur = conn.execute("SELECT id FROM monitor WHERE name = ?", (name,)) | |
| row = cur.fetchone() | |
| if not row: | |
| conn.close() | |
| print(json.dumps({"ok": False, "error": f"Monitor not found: {name}"})) | |
| sys.exit(1) | |
| monitor_id = row["id"] | |
| conn.execute("DELETE FROM monitor_notification WHERE monitor_id = ?", (monitor_id,)) | |
| conn.execute("DELETE FROM monitor_tag WHERE monitor_id = ?", (monitor_id,)) | |
| conn.execute("DELETE FROM monitor_group WHERE monitor_id = ?", (monitor_id,)) | |
| conn.execute("DELETE FROM heartbeat WHERE monitor_id = ?", (monitor_id,)) | |
| conn.execute("DELETE FROM monitor WHERE id = ?", (monitor_id,)) | |
| conn.commit() | |
| conn.close() | |
| print(json.dumps({"ok": True, "deleted": monitor_id, "name": name})) | |
| def cmd_batch_delete(args): | |
| """Delete multiple monitors by ID in one container stop/start cycle.""" | |
| raw = args.json or sys.stdin.read().strip() | |
| try: | |
| ids = json.loads(raw) | |
| except json.JSONDecodeError as e: | |
| print(f"ERROR: Invalid JSON: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| if not isinstance(ids, list): | |
| print("ERROR: Expected a JSON array of IDs", file=sys.stderr) | |
| sys.exit(1) | |
| deleted = [] | |
| failed = [] | |
| with ContainerPaused(): | |
| conn = open_db(readonly=False) | |
| for mid in ids: | |
| try: | |
| mid = int(mid) | |
| conn.execute("DELETE FROM monitor_notification WHERE monitor_id = ?", (mid,)) | |
| conn.execute("DELETE FROM monitor_tag WHERE monitor_id = ?", (mid,)) | |
| conn.execute("DELETE FROM monitor_group WHERE monitor_id = ?", (mid,)) | |
| conn.execute("DELETE FROM heartbeat WHERE monitor_id = ?", (mid,)) | |
| conn.execute("DELETE FROM monitor WHERE id = ?", (mid,)) | |
| deleted.append(mid) | |
| except Exception as e: | |
| log.error(f"Failed to delete id={mid}: {e}") | |
| failed.append({"id": mid, "error": str(e)}) | |
| conn.commit() | |
| conn.close() | |
| out = {"ok": len(failed) == 0, "deleted": deleted, "failed": failed} | |
| print(json.dumps(out)) | |
| if failed: | |
| sys.exit(1) | |
| def cmd_pause(args): | |
| monitor_id = int(args.id) | |
| with ContainerPaused(): | |
| conn = open_db(readonly=False) | |
| conn.execute("UPDATE monitor SET active = 0 WHERE id = ?", (monitor_id,)) | |
| conn.commit() | |
| conn.close() | |
| print(json.dumps({"ok": True, "paused": monitor_id})) | |
| def cmd_resume(args): | |
| monitor_id = int(args.id) | |
| with ContainerPaused(): | |
| conn = open_db(readonly=False) | |
| conn.execute("UPDATE monitor SET active = 1 WHERE id = ?", (monitor_id,)) | |
| conn.commit() | |
| conn.close() | |
| print(json.dumps({"ok": True, "resumed": monitor_id})) | |
| def cmd_edit(args): | |
| monitor_id = int(args.id) | |
| try: | |
| payload = json.loads(args.json) | |
| except json.JSONDecodeError as e: | |
| print(f"ERROR: Invalid JSON: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| # Map friendly keys to DB columns | |
| field_map = { | |
| "name": "name", "url": "url", "active": "active", | |
| "interval": "interval", "type": "type", "maxretries": "maxretries", | |
| "ignoreTls": "ignore_tls", "upsideDown": "upside_down", | |
| "maxredirects": "maxredirects", "retryInterval": "retry_interval", | |
| "resendInterval": "resend_interval", "method": "method", | |
| "hostname": "hostname", "port": "port", "keyword": "keyword", | |
| "description": "description", | |
| } | |
| updates = {} | |
| for user_key, db_col in field_map.items(): | |
| if user_key in payload: | |
| val = payload[user_key] | |
| if isinstance(val, bool): | |
| val = int(val) | |
| updates[db_col] = val | |
| # Direct DB column names also accepted | |
| for k, v in payload.items(): | |
| if k not in field_map and k not in updates: | |
| updates[k] = v | |
| if not updates: | |
| print("ERROR: No valid fields to update", file=sys.stderr) | |
| sys.exit(1) | |
| set_clause = ", ".join(f"{col} = ?" for col in updates) | |
| values = list(updates.values()) + [monitor_id] | |
| with ContainerPaused(): | |
| conn = open_db(readonly=False) | |
| conn.execute(f"UPDATE monitor SET {set_clause} WHERE id = ?", values) | |
| conn.commit() | |
| conn.close() | |
| print(json.dumps({"ok": True, "monitorID": monitor_id, "updated": updates})) | |
| def cmd_import(args): | |
| """Import monitors from a JSON file (same format as --export).""" | |
| if not os.path.exists(args.file): | |
| print(f"ERROR: File not found: {args.file}", file=sys.stderr) | |
| sys.exit(1) | |
| with open(args.file) as f: | |
| monitors = json.load(f) | |
| if not isinstance(monitors, list): | |
| print("ERROR: Expected a JSON array", file=sys.stderr) | |
| sys.exit(1) | |
| added = 0 | |
| skipped = 0 | |
| failed = 0 | |
| with ContainerPaused(): | |
| conn = open_db(readonly=False) | |
| existing_names = {r["name"] for r in conn.execute("SELECT name FROM monitor").fetchall()} | |
| for m in monitors: | |
| if m.get("name") in existing_names: | |
| log.debug(f"Skipping existing: {m['name']}") | |
| skipped += 1 | |
| continue | |
| try: | |
| row = build_monitor_row(m) | |
| columns = [k for k, v in row.items() if v is not None] | |
| placeholders = ", ".join("?" for _ in columns) | |
| values = [row[k] for k in columns] | |
| sql = f"INSERT INTO monitor ({', '.join(columns)}) VALUES ({placeholders})" | |
| conn.execute(sql, values) | |
| added += 1 | |
| except Exception as e: | |
| log.error(f"Failed to import '{m.get('name', '?')}': {e}") | |
| failed += 1 | |
| conn.commit() | |
| conn.close() | |
| print(json.dumps({"ok": failed == 0, "added": added, "skipped": skipped, "failed": failed})) | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Uptime Kuma CLI — direct SQLite DB manipulation (stop→write→start)" | |
| ) | |
| sub = parser.add_subparsers(dest="cmd") | |
| # Keep legacy flat flags for backwards compatibility | |
| parser.add_argument("--list", action="store_true") | |
| parser.add_argument("--export", action="store_true") | |
| parser.add_argument("--add", dest="add_json", metavar="JSON", nargs="?", const="") | |
| parser.add_argument("--batch-add", dest="batch_json", metavar="JSON", nargs="?", const="") | |
| parser.add_argument("--delete", dest="delete_id", metavar="ID") | |
| parser.add_argument("--delete-name", dest="delete_name", metavar="NAME") | |
| parser.add_argument("--batch-delete", dest="batch_delete_json", metavar="JSON", nargs="?", const="") | |
| parser.add_argument("--pause", dest="pause_id", metavar="ID") | |
| parser.add_argument("--resume", dest="resume_id", metavar="ID") | |
| parser.add_argument("--edit", dest="edit_id", metavar="ID") | |
| parser.add_argument("--import", dest="import_file", metavar="FILE") | |
| parser.add_argument("--json", dest="json_payload", metavar="JSON") | |
| args = parser.parse_args() | |
| if args.list: | |
| cmd_list(args) | |
| elif args.export: | |
| cmd_export(args) | |
| elif args.add_json is not None: | |
| class A: | |
| json = args.add_json or args.json_payload | |
| cmd_add(A()) | |
| elif args.batch_json is not None: | |
| class A: | |
| json = args.batch_json or args.json_payload | |
| cmd_batch_add(A()) | |
| elif args.delete_id: | |
| class A: | |
| id = args.delete_id | |
| cmd_delete(A()) | |
| elif args.delete_name: | |
| class A: | |
| name = args.delete_name | |
| cmd_delete_name(A()) | |
| elif args.batch_delete_json is not None: | |
| class A: | |
| json = args.batch_delete_json or args.json_payload | |
| cmd_batch_delete(A()) | |
| elif args.pause_id: | |
| class A: | |
| id = args.pause_id | |
| cmd_pause(A()) | |
| elif args.resume_id: | |
| class A: | |
| id = args.resume_id | |
| cmd_resume(A()) | |
| elif args.edit_id: | |
| if not args.json_payload: | |
| print("ERROR: --edit requires --json", file=sys.stderr) | |
| sys.exit(1) | |
| class A: | |
| id = args.edit_id | |
| json = args.json_payload | |
| cmd_edit(A()) | |
| elif args.import_file: | |
| class A: | |
| file = args.import_file | |
| cmd_import(A()) | |
| else: | |
| parser.print_help() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment