Created
October 6, 2025 14:09
-
-
Save tos-kamiya/db0db176f9ad7cf220120d3aa858182f to your computer and use it in GitHub Desktop.
Compact Ollama model updater via the HTTP API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Compact Ollama model updater via the HTTP API. | |
Behavior | |
-------- | |
- Uses Ollama's /api/tags and /api/pull endpoints (no noisy TTY progress). | |
- If a pull finishes within THRESHOLD seconds: print a single one-line summary per model. | |
- If it exceeds THRESHOLD seconds: switch to compact streaming: | |
* suppress "pulling manifest" spam and summarize it once at the end | |
* print layer completion ("done") at most once per digest; ignore mid-progress | |
* drop lines that are only "<number>s" (e.g., "13s") | |
- Prints a final summary bucketed by: updated / skipped / failed / unknown. | |
Requirements | |
------------ | |
pip install requests | |
""" | |
import argparse | |
import time | |
import json | |
import requests | |
import sys | |
import signal | |
from datetime import datetime | |
from typing import List | |
OLLAMA_HOST_DEFAULT = "http://127.0.0.1:11434" | |
THRESHOLD_DEFAULT = 3.0 # seconds before switching to streaming mode | |
def list_models(ollama_host: str) -> List[str]: | |
"""Return a list of installed model names via /api/tags.""" | |
r = requests.get(f"{ollama_host}/api/tags", timeout=(10, 30)) | |
r.raise_for_status() | |
data = r.json() | |
return [m["name"] for m in data.get("models", [])] | |
def _short_digest(digest: str, n: int = 5) -> str: | |
"""Return a short printable form of a digest, e.g., 'sha256:abc...' -> 'abc' (n chars).""" | |
if not digest: | |
return "?????" | |
return digest.split(":")[-1][:n] | |
def _fmt_bytes(n: int) -> str: | |
"""Format an integer byte size into a human-readable string (e.g., 1.1 GB).""" | |
try: | |
n = int(n) | |
except Exception: | |
return f"{n} bytes" | |
units = ["B", "KB", "MB", "GB", "TB", "PB"] | |
i = 0 | |
f = float(n) | |
while f >= 1024.0 and i < len(units) - 1: | |
f /= 1024.0 | |
i += 1 | |
if i == 0: | |
return f"{int(f)} {units[i]}" | |
return f"{f:.1f} {units[i]}" | |
def pull_via_api(model: str, threshold: float, ollama_host: str) -> str: | |
""" | |
Pull a single model via /api/pull and print compact progress. | |
Returns | |
------- | |
str | |
One of: "updated", "skipped", "failed", or "unknown". | |
""" | |
start = time.time() | |
url = f"{ollama_host}/api/pull" | |
# Use (connect_timeout, read_timeout). Large models may stream for a long time. | |
with requests.post( | |
url, | |
json={"name": model, "stream": True}, | |
stream=True, | |
timeout=(10, 1800), | |
) as r: | |
if r.status_code != 200: | |
print(f">>> {model} ⚠️ failed (HTTP {r.status_code})") | |
return "failed" | |
events = [] | |
status_final = None | |
printed_stream_header = False | |
manifest_count = 0 | |
# Layer progress compression: | |
seen_layers = set() # all observed digests | |
done_layers = set() # digests whose "done" we already printed | |
total_by_layer = {} # digest -> total bytes (for end-of-stream summary) | |
last_status_line = None # de-duplicate identical status lines | |
for raw in r.iter_lines(decode_unicode=True): | |
if not raw: | |
continue | |
try: | |
ev = json.loads(raw) | |
except json.JSONDecodeError: | |
ev = {"status": str(raw)} | |
events.append(ev) | |
st = (ev.get("status") or "").lower() | |
# End-state detection | |
if "up to date" in st: | |
status_final = "skipped" | |
elif st == "success" or "success" in st: | |
status_final = "updated" | |
# Noise suppression | |
if st.startswith("pulling manifest"): | |
manifest_count += 1 | |
continue | |
s = st.strip() | |
if s.endswith("s") and s[:-1].isdigit(): # e.g., "13s" | |
continue | |
# Decide when to switch to streaming output | |
if (time.time() - start) > threshold and not printed_stream_header: | |
print(f">>> {model} ⏳ updating...") | |
printed_stream_header = True | |
# Streaming mode: print only meaningful, condensed signals | |
if printed_stream_header: | |
dg = ev.get("digest") | |
comp = ev.get("completed") | |
total = ev.get("total") | |
if dg: | |
# Do not print mid-progress; only print a single "done" per digest. | |
seen_layers.add(dg) | |
if total is not None: | |
total_by_layer[dg] = total | |
if total is not None and comp is not None and comp == total: | |
if dg not in done_layers: | |
done_layers.add(dg) | |
size_txt = _fmt_bytes(total if total is not None else 0) | |
print(f" layer {_short_digest(dg)} done ({size_txt})") | |
continue | |
# Print distinct status lines (avoid duplicates) | |
status_line = ev.get("status") | |
if status_line and status_line != last_status_line: | |
print(f" {status_line}") | |
last_status_line = status_line | |
# Stream ended — print suppressed summaries (streaming case only) | |
if printed_stream_header: | |
if manifest_count > 0: | |
print(f" (pulling manifest x{manifest_count})") | |
if seen_layers: | |
total_size = sum(v for v in total_by_layer.values() if isinstance(v, int)) | |
if total_size > 0: | |
print(f" (layers: {len(done_layers)}/{len(seen_layers)} done, ~{_fmt_bytes(total_size)})") | |
else: | |
print(f" (layers: {len(done_layers)}/{len(seen_layers)} done)") | |
# Final state resolution if not already determined | |
if status_final is None: | |
if events: | |
last = (events[-1].get("status") or "").lower() | |
if "up to date" in last: | |
status_final = "skipped" | |
elif "success" in last: | |
status_final = "updated" | |
else: | |
status_final = "unknown" | |
else: | |
status_final = "unknown" | |
symbol = { | |
"updated": "✅ updated", | |
"skipped": "🟢 up to date", | |
"failed": "⚠️ failed", | |
}.get(status_final, "ℹ️ done") | |
if not printed_stream_header: | |
# Quick finish: single-line output | |
print(f">>> {model} {symbol}") | |
else: | |
# Streaming case: compact final line | |
print(f" {symbol}") | |
return status_final | |
def main() -> None: | |
"""Entry point: parse args, iterate models, pull, and print the final summary.""" | |
ap = argparse.ArgumentParser( | |
description="Update all downloaded Ollama models (compact, via HTTP API)." | |
) | |
ap.add_argument( | |
"-t", "--threshold", | |
type=float, | |
default=THRESHOLD_DEFAULT, | |
help=f"Seconds before switching to streaming (default: {THRESHOLD_DEFAULT})", | |
) | |
ap.add_argument( | |
"--host", | |
default=OLLAMA_HOST_DEFAULT, | |
help=f"Ollama host (default: {OLLAMA_HOST_DEFAULT})", | |
) | |
args = ap.parse_args() | |
ollama_host = args.host.rstrip("/") | |
# Allow Ctrl-C without breaking the final summary layout. | |
interrupted = {"flag": False} | |
def _sigint(_sig, _frm): | |
interrupted["flag"] = True | |
print("\n^C detected — finishing current step and summarizing...", file=sys.stderr) | |
signal.signal(signal.SIGINT, _sigint) | |
try: | |
models = list_models(ollama_host) | |
except Exception as e: | |
print(f"❌ Failed to list models from {ollama_host}: {e}") | |
return | |
if not models: | |
print("No models found.") | |
return | |
print(f"🚀 {len(models)} models to check...\n") | |
buckets = {"updated": [], "skipped": [], "failed": [], "unknown": []} | |
for m in models: | |
if interrupted["flag"]: | |
print("\n⏹ Stopped by user.\n") | |
break | |
try: | |
status = pull_via_api(m, args.threshold, ollama_host) | |
except requests.exceptions.RequestException as e: | |
print(f">>> {m} ⚠️ failed (network) — {e}") | |
status = "failed" | |
buckets.get(status, buckets["unknown"]).append(m) | |
print("\n========= Summary =========") | |
for key, emoji in [("updated", "✅"), ("skipped", "🟢"), ("failed", "⚠️"), ("unknown", "ℹ️")]: | |
print(f"{emoji} {key.capitalize()}: {len(buckets[key])}") | |
for name in buckets[key]: | |
print(f" - {name}") | |
print() | |
print(f"🕒 Completed at {datetime.now():%Y-%m-%d %H:%M:%S}") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment