Skip to content

Instantly share code, notes, and snippets.

@rkok
Created March 24, 2026 14:00
Show Gist options
  • Select an option

  • Save rkok/86a303f0e1f1eedfe821c90c315d42f2 to your computer and use it in GitHub Desktop.

Select an option

Save rkok/86a303f0e1f1eedfe821c90c315d42f2 to your computer and use it in GitHub Desktop.
Uptime Kuma "API" CLI
#!/usr/bin/env python3
"""
uptime-kuma-cli.py — Manage Uptime Kuma monitors via direct SQLite DB manipulation.
Workflow: stop container → write DB → start container.
Reads (--list, --export) do NOT stop/start the container — DB is opened read-only.
Usage:
python3 uptime-kuma-cli.py --list
python3 uptime-kuma-cli.py --add '{"name": "[vps01] foo.nl", "url": "https://foo.nl"}'
python3 uptime-kuma-cli.py --batch-add '[{"name": "[vps01] foo.nl", "url": "https://foo.nl"}, ...]'
python3 uptime-kuma-cli.py --delete <monitorID>
python3 uptime-kuma-cli.py --delete-name "[vps01] foo.nl"
python3 uptime-kuma-cli.py --batch-delete '[<id>, <id>, ...]'
python3 uptime-kuma-cli.py --pause <monitorID>
python3 uptime-kuma-cli.py --resume <monitorID>
python3 uptime-kuma-cli.py --edit <monitorID> --json '{"name": "New Name"}'
python3 uptime-kuma-cli.py --export # dump all monitors as JSON
python3 uptime-kuma-cli.py --import file.json # import monitors from JSON file
"""
import sys
import json
import os
import argparse
import logging
import sqlite3
import subprocess
import time
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
log = logging.getLogger("kuma-cli")
log.setLevel(logging.DEBUG)
_handler = logging.StreamHandler(sys.stderr)
_handler.setFormatter(logging.Formatter("[%(asctime)s.%(msecs)03d] %(levelname)s %(message)s", datefmt="%H:%M:%S"))
log.addHandler(_handler)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
KUMA_DB = "/var/lib/docker/volumes/uptime-kuma_uptime-kuma-data/_data/kuma.db"
CONTAINER_NAME = "uptime-kuma"
DEFAULT_USER_ID = 1 # Uptime Kuma's default user ID
# ---------------------------------------------------------------------------
# Docker helpers
# ---------------------------------------------------------------------------
def docker_stop():
log.debug(f"Stopping container '{CONTAINER_NAME}'...")
result = subprocess.run(
["sudo", "/usr/bin/docker", "stop", CONTAINER_NAME],
capture_output=True, text=True
)
if result.returncode != 0:
print(f"ERROR: Failed to stop container: {result.stderr.strip()}", file=sys.stderr)
sys.exit(1)
# Brief pause to ensure DB is fully flushed
time.sleep(1)
log.debug("Container stopped.")
def docker_start():
log.debug(f"Starting container '{CONTAINER_NAME}'...")
result = subprocess.run(
["sudo", "/usr/bin/docker", "start", CONTAINER_NAME],
capture_output=True, text=True
)
if result.returncode != 0:
print(f"ERROR: Failed to start container: {result.stderr.strip()}", file=sys.stderr)
sys.exit(1)
log.debug("Container started.")
class ContainerPaused:
"""Context manager: stops container on enter, starts on exit (even on error)."""
def __enter__(self):
docker_stop()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
docker_start()
return False # don't suppress exceptions
# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------
def open_db(readonly=False):
if readonly:
conn = sqlite3.connect(f"file:{KUMA_DB}?mode=ro", uri=True)
else:
conn = sqlite3.connect(KUMA_DB)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
return conn
def get_all_monitors(conn):
cur = conn.execute(
"SELECT id, name, url, active, type, interval FROM monitor ORDER BY name"
)
return [dict(row) for row in cur.fetchall()]
# ---------------------------------------------------------------------------
# Monitor payload builder
# ---------------------------------------------------------------------------
def build_monitor_row(payload):
"""Map user-supplied dict to DB column values."""
monitor_type = payload.get("type", "http").lower()
type_map = {"https": "http", "tcp": "port"}
monitor_type = type_map.get(monitor_type, monitor_type)
accepted = payload.get("accepted_statuscodes", ["200-299"])
return {
"name": payload["name"],
"active": int(payload.get("active", True)),
"user_id": payload.get("user_id", DEFAULT_USER_ID),
"interval": payload.get("interval", 60),
"url": payload.get("url", ""),
"type": monitor_type,
"maxretries": payload.get("maxretries", 3),
"ignore_tls": int(payload.get("ignoreTls", False)),
"upside_down": int(payload.get("upsideDown", False)),
"maxredirects": payload.get("maxredirects", 10),
"accepted_statuscodes_json": json.dumps(accepted),
"dns_resolve_type": payload.get("dns_resolve_type", "A"),
"dns_resolve_server": payload.get("dns_resolve_server", "1.1.1.1"),
"retry_interval": payload.get("retryInterval", 60),
"resend_interval": payload.get("resendInterval", 0),
"method": payload.get("method", "GET"),
"hostname": payload.get("hostname"),
"port": payload.get("port"),
"keyword": payload.get("keyword"),
"body": payload.get("body"),
"headers": payload.get("headers"),
"basic_auth_user": payload.get("basic_auth_user"),
"basic_auth_pass": payload.get("basic_auth_pass"),
"description": payload.get("description"),
}
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
def cmd_list(args):
"""List all monitors (read-only, container stays up)."""
conn = open_db(readonly=True)
monitors = get_all_monitors(conn)
conn.close()
out = [
{"id": m["id"], "name": m["name"], "url": m["url"], "active": bool(m["active"])}
for m in monitors
]
print(json.dumps(out))
def cmd_export(args):
"""Export all monitors as full JSON (read-only)."""
conn = open_db(readonly=True)
cur = conn.execute("SELECT * FROM monitor ORDER BY id")
rows = [dict(row) for row in cur.fetchall()]
conn.close()
print(json.dumps(rows, indent=2))
def cmd_add(args):
raw = args.json or sys.stdin.read().strip()
if not raw:
print("ERROR: No input provided", file=sys.stderr)
sys.exit(1)
try:
payload = json.loads(raw)
except json.JSONDecodeError as e:
print(f"ERROR: Invalid JSON: {e}", file=sys.stderr)
sys.exit(1)
if not payload.get("name"):
print("ERROR: 'name' is required", file=sys.stderr)
sys.exit(1)
row = build_monitor_row(payload)
columns = [k for k, v in row.items() if v is not None]
placeholders = ", ".join("?" for _ in columns)
values = [row[k] for k in columns]
sql = f"INSERT INTO monitor ({', '.join(columns)}) VALUES ({placeholders})"
with ContainerPaused():
conn = open_db(readonly=False)
cur = conn.execute(sql, values)
new_id = cur.lastrowid
conn.commit()
conn.close()
out = {"ok": True, "monitorID": new_id, "name": row["name"], "url": row.get("url", "")}
print(json.dumps(out))
def cmd_batch_add(args):
"""Add multiple monitors in one container stop/start cycle."""
raw = args.json or sys.stdin.read().strip()
if not raw:
print("ERROR: No input provided", file=sys.stderr)
sys.exit(1)
try:
monitors = json.loads(raw)
except json.JSONDecodeError as e:
print(f"ERROR: Invalid JSON: {e}", file=sys.stderr)
sys.exit(1)
if not isinstance(monitors, list) or not monitors:
print("ERROR: Expected a non-empty JSON array", file=sys.stderr)
sys.exit(1)
results = []
added = 0
failed = 0
with ContainerPaused():
conn = open_db(readonly=False)
for m in monitors:
if not m.get("name"):
log.warning(f"Skipping entry without 'name': {m}")
failed += 1
results.append({"name": m.get("name", "?"), "ok": False, "error": "missing name"})
continue
try:
row = build_monitor_row(m)
columns = [k for k, v in row.items() if v is not None]
placeholders = ", ".join("?" for _ in columns)
values = [row[k] for k in columns]
sql = f"INSERT INTO monitor ({', '.join(columns)}) VALUES ({placeholders})"
cur = conn.execute(sql, values)
new_id = cur.lastrowid
results.append({"name": m["name"], "url": m.get("url", ""), "ok": True, "monitorID": new_id})
added += 1
log.debug(f"Inserted '{m['name']}' → id={new_id}")
except Exception as e:
log.error(f"Failed to insert '{m.get('name', '?')}': {e}")
failed += 1
results.append({"name": m.get("name", "?"), "url": m.get("url", ""), "ok": False, "error": str(e)})
conn.commit()
conn.close()
out = {"ok": failed == 0, "added": added, "failed": failed, "results": results}
print(json.dumps(out))
if failed > 0:
sys.exit(1)
def cmd_delete(args):
monitor_id = int(args.id)
with ContainerPaused():
conn = open_db(readonly=False)
# Also clean up related tables
conn.execute("DELETE FROM monitor_notification WHERE monitor_id = ?", (monitor_id,))
conn.execute("DELETE FROM monitor_tag WHERE monitor_id = ?", (monitor_id,))
conn.execute("DELETE FROM monitor_group WHERE monitor_id = ?", (monitor_id,))
conn.execute("DELETE FROM heartbeat WHERE monitor_id = ?", (monitor_id,))
conn.execute("DELETE FROM monitor WHERE id = ?", (monitor_id,))
conn.commit()
conn.close()
print(json.dumps({"ok": True, "deleted": monitor_id}))
def cmd_delete_name(args):
"""Delete a monitor by exact name (useful when ID is unknown)."""
name = args.name
with ContainerPaused():
conn = open_db(readonly=False)
cur = conn.execute("SELECT id FROM monitor WHERE name = ?", (name,))
row = cur.fetchone()
if not row:
conn.close()
print(json.dumps({"ok": False, "error": f"Monitor not found: {name}"}))
sys.exit(1)
monitor_id = row["id"]
conn.execute("DELETE FROM monitor_notification WHERE monitor_id = ?", (monitor_id,))
conn.execute("DELETE FROM monitor_tag WHERE monitor_id = ?", (monitor_id,))
conn.execute("DELETE FROM monitor_group WHERE monitor_id = ?", (monitor_id,))
conn.execute("DELETE FROM heartbeat WHERE monitor_id = ?", (monitor_id,))
conn.execute("DELETE FROM monitor WHERE id = ?", (monitor_id,))
conn.commit()
conn.close()
print(json.dumps({"ok": True, "deleted": monitor_id, "name": name}))
def cmd_batch_delete(args):
"""Delete multiple monitors by ID in one container stop/start cycle."""
raw = args.json or sys.stdin.read().strip()
try:
ids = json.loads(raw)
except json.JSONDecodeError as e:
print(f"ERROR: Invalid JSON: {e}", file=sys.stderr)
sys.exit(1)
if not isinstance(ids, list):
print("ERROR: Expected a JSON array of IDs", file=sys.stderr)
sys.exit(1)
deleted = []
failed = []
with ContainerPaused():
conn = open_db(readonly=False)
for mid in ids:
try:
mid = int(mid)
conn.execute("DELETE FROM monitor_notification WHERE monitor_id = ?", (mid,))
conn.execute("DELETE FROM monitor_tag WHERE monitor_id = ?", (mid,))
conn.execute("DELETE FROM monitor_group WHERE monitor_id = ?", (mid,))
conn.execute("DELETE FROM heartbeat WHERE monitor_id = ?", (mid,))
conn.execute("DELETE FROM monitor WHERE id = ?", (mid,))
deleted.append(mid)
except Exception as e:
log.error(f"Failed to delete id={mid}: {e}")
failed.append({"id": mid, "error": str(e)})
conn.commit()
conn.close()
out = {"ok": len(failed) == 0, "deleted": deleted, "failed": failed}
print(json.dumps(out))
if failed:
sys.exit(1)
def cmd_pause(args):
monitor_id = int(args.id)
with ContainerPaused():
conn = open_db(readonly=False)
conn.execute("UPDATE monitor SET active = 0 WHERE id = ?", (monitor_id,))
conn.commit()
conn.close()
print(json.dumps({"ok": True, "paused": monitor_id}))
def cmd_resume(args):
monitor_id = int(args.id)
with ContainerPaused():
conn = open_db(readonly=False)
conn.execute("UPDATE monitor SET active = 1 WHERE id = ?", (monitor_id,))
conn.commit()
conn.close()
print(json.dumps({"ok": True, "resumed": monitor_id}))
def cmd_edit(args):
monitor_id = int(args.id)
try:
payload = json.loads(args.json)
except json.JSONDecodeError as e:
print(f"ERROR: Invalid JSON: {e}", file=sys.stderr)
sys.exit(1)
# Map friendly keys to DB columns
field_map = {
"name": "name", "url": "url", "active": "active",
"interval": "interval", "type": "type", "maxretries": "maxretries",
"ignoreTls": "ignore_tls", "upsideDown": "upside_down",
"maxredirects": "maxredirects", "retryInterval": "retry_interval",
"resendInterval": "resend_interval", "method": "method",
"hostname": "hostname", "port": "port", "keyword": "keyword",
"description": "description",
}
updates = {}
for user_key, db_col in field_map.items():
if user_key in payload:
val = payload[user_key]
if isinstance(val, bool):
val = int(val)
updates[db_col] = val
# Direct DB column names also accepted
for k, v in payload.items():
if k not in field_map and k not in updates:
updates[k] = v
if not updates:
print("ERROR: No valid fields to update", file=sys.stderr)
sys.exit(1)
set_clause = ", ".join(f"{col} = ?" for col in updates)
values = list(updates.values()) + [monitor_id]
with ContainerPaused():
conn = open_db(readonly=False)
conn.execute(f"UPDATE monitor SET {set_clause} WHERE id = ?", values)
conn.commit()
conn.close()
print(json.dumps({"ok": True, "monitorID": monitor_id, "updated": updates}))
def cmd_import(args):
"""Import monitors from a JSON file (same format as --export)."""
if not os.path.exists(args.file):
print(f"ERROR: File not found: {args.file}", file=sys.stderr)
sys.exit(1)
with open(args.file) as f:
monitors = json.load(f)
if not isinstance(monitors, list):
print("ERROR: Expected a JSON array", file=sys.stderr)
sys.exit(1)
added = 0
skipped = 0
failed = 0
with ContainerPaused():
conn = open_db(readonly=False)
existing_names = {r["name"] for r in conn.execute("SELECT name FROM monitor").fetchall()}
for m in monitors:
if m.get("name") in existing_names:
log.debug(f"Skipping existing: {m['name']}")
skipped += 1
continue
try:
row = build_monitor_row(m)
columns = [k for k, v in row.items() if v is not None]
placeholders = ", ".join("?" for _ in columns)
values = [row[k] for k in columns]
sql = f"INSERT INTO monitor ({', '.join(columns)}) VALUES ({placeholders})"
conn.execute(sql, values)
added += 1
except Exception as e:
log.error(f"Failed to import '{m.get('name', '?')}': {e}")
failed += 1
conn.commit()
conn.close()
print(json.dumps({"ok": failed == 0, "added": added, "skipped": skipped, "failed": failed}))
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Uptime Kuma CLI — direct SQLite DB manipulation (stop→write→start)"
)
sub = parser.add_subparsers(dest="cmd")
# Keep legacy flat flags for backwards compatibility
parser.add_argument("--list", action="store_true")
parser.add_argument("--export", action="store_true")
parser.add_argument("--add", dest="add_json", metavar="JSON", nargs="?", const="")
parser.add_argument("--batch-add", dest="batch_json", metavar="JSON", nargs="?", const="")
parser.add_argument("--delete", dest="delete_id", metavar="ID")
parser.add_argument("--delete-name", dest="delete_name", metavar="NAME")
parser.add_argument("--batch-delete", dest="batch_delete_json", metavar="JSON", nargs="?", const="")
parser.add_argument("--pause", dest="pause_id", metavar="ID")
parser.add_argument("--resume", dest="resume_id", metavar="ID")
parser.add_argument("--edit", dest="edit_id", metavar="ID")
parser.add_argument("--import", dest="import_file", metavar="FILE")
parser.add_argument("--json", dest="json_payload", metavar="JSON")
args = parser.parse_args()
if args.list:
cmd_list(args)
elif args.export:
cmd_export(args)
elif args.add_json is not None:
class A:
json = args.add_json or args.json_payload
cmd_add(A())
elif args.batch_json is not None:
class A:
json = args.batch_json or args.json_payload
cmd_batch_add(A())
elif args.delete_id:
class A:
id = args.delete_id
cmd_delete(A())
elif args.delete_name:
class A:
name = args.delete_name
cmd_delete_name(A())
elif args.batch_delete_json is not None:
class A:
json = args.batch_delete_json or args.json_payload
cmd_batch_delete(A())
elif args.pause_id:
class A:
id = args.pause_id
cmd_pause(A())
elif args.resume_id:
class A:
id = args.resume_id
cmd_resume(A())
elif args.edit_id:
if not args.json_payload:
print("ERROR: --edit requires --json", file=sys.stderr)
sys.exit(1)
class A:
id = args.edit_id
json = args.json_payload
cmd_edit(A())
elif args.import_file:
class A:
file = args.import_file
cmd_import(A())
else:
parser.print_help()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment