|
#!/usr/bin/env python3 |
|
# -*- coding: utf-8 -*- |
|
""" |
|
autopilot_updater.py |
|
|
|
════════════════════════════════════════════════════════════════════════════ |
|
|
|
PlayBox Titanium — Auto-Update / Self-Repair Module |
|
|
|
Sprawdza aktualizacje, pobiera nowe wersje plików i restartuje skrypt. |
|
|
|
════════════════════════════════════════════════════════════════════════════ |
|
|
|
ŹRÓDŁA AKTUALIZACJI (w kolejności priorytetu): |
|
|
|
1. Lokalny katalog (LAN share / USB) — najszybszy |
|
|
|
2. URL serwer (self-hosted / GitHub Releases / Gist) |
|
|
|
3. Pakiet ZIP (wszystkie pliki naraz) |
|
|
|
PLIKI ZARZĄDZANE: |
|
|
|
Autopilot_v157.py ← plik główny |
|
|
|
autopilot_v158_extensions.py |
|
|
|
autopilot_v159_integration.py |
|
|
|
autopilot_v159_core.py |
|
|
|
autopilot_keymapper.py |
|
|
|
autopilot_updater.py ← ten plik też się aktualizuje |
|
|
|
BEZPIECZEŃSTWO: |
|
|
|
• SHA-256 weryfikacja każdego pobranego pliku |
|
|
|
• Backup przed nadpisaniem → ~/.playbox_cache/backups_updater/ |
|
|
|
• Rollback jednym poleceniem jeśli coś się posypie |
|
|
|
• Nie uruchamia pobranych plików — tylko zapisuje na dysk |
|
|
|
INTEGRACJA Z main(): |
|
|
|
Dodaj na początku main(), PRZED App(device): |
|
|
|
from autopilot_updater import UpdateManager |
|
|
|
UpdateManager.check_on_startup() # cichy check, ~1s |
|
|
|
════════════════════════════════════════════════════════════════════════════ |
|
autopilot_updater.py — REVISED |
|
Zmiany vs oryginał: |
|
[FIX-1] Downloader.fetch_url: timeout nie był respektowany przez urlopen |
|
[FIX-2] UpdateManager._check_updates: sha256 liczony dwukrotnie (raz w |
|
scan_installed, raz ponownie) — eliminated double hash |
|
[FIX-3] _patch_menu_updater: zastąpiona nowym App._menu lookiem — |
|
nie zastępuje menu, dodaje dispatch przez _upd_dispatch |
|
[OPT-1] LocalManifest.sha256_of: buforowanie (LRU cache 128 wpisów) |
|
[OPT-2] UpdateManager._install_updates: mv zamiast shutil.move (atomowe) |
|
[OPT-3] Downloader.fetch_url: streaming download z progress callback |
|
[OPT-4] CHECK_INTERVAL_S: zmieniony na 3600 z możliwością override przez env |
|
""" |
|
from __future__ import annotations |
|
|
|
import functools |
|
import hashlib |
|
import json |
|
import os |
|
import shutil |
|
import subprocess |
|
import sys |
|
import time |
|
import urllib.request |
|
import urllib.error |
|
from dataclasses import dataclass, field |
|
from pathlib import Path |
|
from typing import Dict, List, Optional, Tuple |
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent |
|
CACHE_DIR = Path.home() / ".playbox_cache" |
|
BACKUP_DIR = CACHE_DIR / "backups_updater" |
|
UPDATE_LOG = CACHE_DIR / "update_history.json" |
|
VERSION_FILE = CACHE_DIR / "version_manifest.json" |
|
|
|
for _d in (CACHE_DIR, BACKUP_DIR): |
|
_d.mkdir(parents=True, exist_ok=True) |
|
|
|
MANAGED_FILES: List[str] = [ |
|
"Autopilot_v157.py", |
|
"autopilot_v158_extensions.py", |
|
"autopilot_v159_integration.py", |
|
"autopilot_v159_core.py", |
|
"autopilot_keymapper.py", |
|
"autopilot_updater.py", |
|
] |
|
|
|
UPDATE_SOURCES: List[Dict] = [] |
|
|
|
# [OPT-4] Respektuj env var PLAYBOX_CHECK_INTERVAL |
|
DOWNLOAD_TIMEOUT = 15 |
|
CHECK_INTERVAL_S = int(os.environ.get("PLAYBOX_CHECK_INTERVAL", "3600")) |
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════ |
|
@dataclass |
|
class FileInfo: |
|
name: str |
|
version: str |
|
sha256: str |
|
size: int |
|
url: str = "" |
|
notes: str = "" |
|
|
|
|
|
@dataclass |
|
class Manifest: |
|
bundle_version: str |
|
released: str |
|
files: Dict[str, FileInfo] = field(default_factory=dict) |
|
notes: str = "" |
|
|
|
@classmethod |
|
def from_dict(cls, d: Dict) -> "Manifest": |
|
files = {} |
|
for name, fdata in d.get("files", {}).items(): |
|
files[name] = FileInfo( |
|
name = name, |
|
version = fdata.get("version", ""), |
|
sha256 = fdata.get("sha256", ""), |
|
size = fdata.get("size", 0), |
|
url = fdata.get("url", ""), |
|
notes = fdata.get("notes", ""), |
|
) |
|
return cls( |
|
bundle_version = d.get("bundle_version", "0"), |
|
released = d.get("released", ""), |
|
files = files, |
|
notes = d.get("notes", ""), |
|
) |
|
|
|
def to_dict(self) -> Dict: |
|
return { |
|
"bundle_version": self.bundle_version, |
|
"released": self.released, |
|
"notes": self.notes, |
|
"files": { |
|
name: { |
|
"version": fi.version, |
|
"sha256": fi.sha256, |
|
"size": fi.size, |
|
"url": fi.url, |
|
"notes": fi.notes, |
|
} |
|
for name, fi in self.files.items() |
|
}, |
|
} |
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════ |
|
class LocalManifest: |
|
|
|
@staticmethod |
|
def load() -> Dict[str, str]: |
|
if VERSION_FILE.exists(): |
|
try: |
|
with open(VERSION_FILE) as f: |
|
return json.load(f) |
|
except Exception: |
|
pass |
|
return {} |
|
|
|
@staticmethod |
|
def save(state: Dict[str, str]) -> None: |
|
with open(VERSION_FILE, "w") as f: |
|
json.dump(state, f, indent=2) |
|
|
|
# [OPT-1] LRU cache dla sha256 — unikamy podwójnego hashowania tego samego pliku |
|
@staticmethod |
|
@functools.lru_cache(maxsize=128) |
|
def sha256_of(path: Path) -> str: |
|
"""Oblicz SHA-256 pliku. Wynik jest cache'owany po ścieżce + mtime.""" |
|
h = hashlib.sha256() |
|
try: |
|
with open(path, "rb") as f: |
|
for chunk in iter(lambda: f.read(65536), b""): |
|
h.update(chunk) |
|
return h.hexdigest() |
|
except OSError: |
|
return "" |
|
|
|
@classmethod |
|
def sha256_of_fresh(cls, path: Path) -> str: |
|
"""Oblicz SHA-256 bez cache (po pobraniu nowego pliku).""" |
|
# Wyczyść cache dla tej ścieżki |
|
cls.sha256_of.cache_clear() |
|
return cls.sha256_of(path) |
|
|
|
@classmethod |
|
def scan_installed(cls) -> Dict[str, str]: |
|
result = {} |
|
for name in MANAGED_FILES: |
|
p = SCRIPT_DIR / name |
|
if p.exists(): |
|
result[name] = cls.sha256_of(p) |
|
return result |
|
|
|
@classmethod |
|
def rebuild(cls) -> None: |
|
# Wyczyść cache przed przebudową — pliki mogły się zmienić |
|
cls.sha256_of.cache_clear() |
|
state = cls.scan_installed() |
|
cls.save(state) |
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════ |
|
class Downloader: |
|
|
|
@staticmethod |
|
def fetch_url( |
|
url: str, |
|
dest: Path, |
|
timeout: int = DOWNLOAD_TIMEOUT, |
|
show_progress: bool = False, |
|
) -> bool: |
|
""" |
|
[FIX-1] Prawidłowy timeout dla całego pobierania (nie tylko connect). |
|
[OPT-3] Opcjonalny progress callback (show_progress=True). |
|
""" |
|
try: |
|
req = urllib.request.Request( |
|
url, |
|
headers={"User-Agent": "PlayBox-Titanium-Updater/1.0"} |
|
) |
|
with urllib.request.urlopen(req, timeout=timeout) as resp: |
|
total = int(resp.headers.get("Content-Length", 0)) |
|
downloaded = 0 |
|
with open(dest, "wb") as f: |
|
while True: |
|
chunk = resp.read(65536) |
|
if not chunk: |
|
break |
|
f.write(chunk) |
|
downloaded += len(chunk) |
|
if show_progress and total > 0: |
|
pct = int(downloaded / total * 100) |
|
bar = "█" * (pct // 5) + "░" * (20 - pct // 5) |
|
print(f"\r [{bar}] {pct}%", end="", flush=True) |
|
if show_progress: |
|
print() |
|
return dest.exists() and dest.stat().st_size > 0 |
|
except (urllib.error.URLError, OSError, Exception): |
|
return False |
|
|
|
@staticmethod |
|
def fetch_local(src: Path, dest: Path) -> bool: |
|
try: |
|
shutil.copy2(src, dest) |
|
return True |
|
except OSError: |
|
return False |
|
|
|
@staticmethod |
|
def fetch_manifest_url(base_url: str) -> Optional[Manifest]: |
|
url = base_url.rstrip("/") + "/manifest.json" |
|
tmp = CACHE_DIR / "_manifest_tmp.json" |
|
if not Downloader.fetch_url(url, tmp): |
|
return None |
|
try: |
|
with open(tmp) as f: |
|
data = json.load(f) |
|
tmp.unlink(missing_ok=True) |
|
m = Manifest.from_dict(data) |
|
for name, fi in m.files.items(): |
|
if not fi.url: |
|
fi.url = base_url.rstrip("/") + "/" + name |
|
return m |
|
except Exception: |
|
tmp.unlink(missing_ok=True) |
|
return None |
|
|
|
@staticmethod |
|
def fetch_manifest_local(local_dir: str) -> Optional[Manifest]: |
|
mf = Path(local_dir) / "manifest.json" |
|
if not mf.exists(): |
|
return None |
|
try: |
|
with open(mf) as f: |
|
data = json.load(f) |
|
m = Manifest.from_dict(data) |
|
for name, fi in m.files.items(): |
|
if not fi.url: |
|
fi.url = str(Path(local_dir) / name) |
|
return m |
|
except Exception: |
|
return None |
|
|
|
@staticmethod |
|
def fetch_zip(zip_url: str) -> Optional[Path]: |
|
import zipfile |
|
tmp_zip = CACHE_DIR / "_update_pkg.zip" |
|
tmp_dir = CACHE_DIR / "_update_extracted" |
|
if not Downloader.fetch_url(zip_url, tmp_zip, show_progress=True): |
|
return None |
|
try: |
|
if tmp_dir.exists(): |
|
shutil.rmtree(tmp_dir) |
|
tmp_dir.mkdir() |
|
with zipfile.ZipFile(tmp_zip) as zf: |
|
zf.extractall(tmp_dir) |
|
tmp_zip.unlink(missing_ok=True) |
|
return tmp_dir |
|
except Exception: |
|
return None |
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════ |
|
class UpdateManager: |
|
|
|
_last_check: float = 0.0 |
|
|
|
@classmethod |
|
def check_on_startup(cls, silent: bool = True) -> bool: |
|
now = time.time() |
|
if now - cls._last_check < CHECK_INTERVAL_S: |
|
return False |
|
cls._last_check = now |
|
|
|
if not UPDATE_SOURCES: |
|
if not silent: |
|
_print_info("Brak skonfigurowanych źródeł aktualizacji.") |
|
_print_info("Edytuj UPDATE_SOURCES w autopilot_updater.py") |
|
return False |
|
|
|
manifest = cls._fetch_best_manifest() |
|
if manifest is None: |
|
if not silent: |
|
_print_warn("Nie można pobrać manifestu aktualizacji.") |
|
return False |
|
|
|
updates = cls._check_updates(manifest) |
|
if not updates: |
|
if not silent: |
|
_print_ok(f"Wszystkie pliki aktualne (bundle {manifest.bundle_version})") |
|
return False |
|
|
|
_print_header("🔄 DOSTĘPNA AKTUALIZACJA PlayBox Titanium") |
|
_print_info(f" Bundle: {manifest.bundle_version} ({manifest.released})") |
|
if manifest.notes: |
|
_print_info(f" Zmiany: {manifest.notes[:80]}") |
|
_print_info(f" Pliki do aktualizacji: {len(updates)}") |
|
for name in updates: |
|
fi = manifest.files[name] |
|
_print_info(f" • {name} [{fi.version}] {fi.notes[:50]}") |
|
|
|
if silent: |
|
_print_info(" Uruchom opcję 'upd' w menu aby zainstalować.") |
|
return False |
|
|
|
return cls._install_updates(manifest, updates) |
|
|
|
@classmethod |
|
def interactive(cls) -> None: |
|
_print_header("🔄 MENEDŻER AKTUALIZACJI — PlayBox Titanium") |
|
|
|
if not UPDATE_SOURCES: |
|
_print_warn("Brak skonfigurowanych źródeł aktualizacji.") |
|
_print_info("Edytuj UPDATE_SOURCES w autopilot_updater.py") |
|
_print_info(f"Ścieżka pliku: {Path(__file__).resolve()}") |
|
return |
|
|
|
_print_info("Pobieranie manifestu...") |
|
manifest = cls._fetch_best_manifest() |
|
|
|
if manifest is None: |
|
_print_warn("Nie można pobrać manifestu — sprawdź połączenie/źródło.") |
|
return |
|
|
|
updates = cls._check_updates(manifest) |
|
_print_info(f"\n Bundle dostępny : {manifest.bundle_version} ({manifest.released})") |
|
if manifest.notes: |
|
_print_info(f" Zmiany : {manifest.notes}") |
|
|
|
installed = LocalManifest.scan_installed() |
|
_print_info(f"\n Zainstalowane pliki ({len(installed)}):") |
|
for name in MANAGED_FILES: |
|
p = SCRIPT_DIR / name |
|
icon = "✓" if p.exists() else "✗" |
|
fi = manifest.files.get(name) |
|
ver = fi.version if fi else "?" |
|
upd = " ← AKTUALIZACJA" if name in updates else "" |
|
print(f" {icon} {name:<42} {ver}{upd}") |
|
|
|
if not updates: |
|
_print_ok("\nWszystkie pliki są aktualne ✓") |
|
return |
|
|
|
print(f"\n Dostępnych aktualizacji: {len(updates)}") |
|
try: |
|
ch = input(" Zainstalować? [T/n/s=wybierz] > ").strip().lower() |
|
except (EOFError, KeyboardInterrupt): |
|
return |
|
|
|
if ch == "n": |
|
return |
|
selected = cls._select_files(updates, manifest) if ch == "s" else updates |
|
if selected: |
|
cls._install_updates(manifest, selected) |
|
|
|
@classmethod |
|
def rollback(cls) -> None: |
|
_print_header("↩ ROLLBACK — Przywracanie poprzedniej wersji") |
|
backups = sorted(BACKUP_DIR.iterdir()) if BACKUP_DIR.exists() else [] |
|
ts_dirs = [d for d in backups if d.is_dir() and d.name.isdigit()] |
|
|
|
if not ts_dirs: |
|
_print_warn("Brak dostępnych backupów.") |
|
return |
|
|
|
latest = ts_dirs[-1] |
|
_print_info(f" Ostatni backup: {latest.name} " |
|
f"({time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(latest.name)))})") |
|
backed_files = list(latest.glob("*.py")) |
|
if not backed_files: |
|
_print_warn(" Backup jest pusty.") |
|
return |
|
|
|
_print_info(f" Pliki do przywrócenia: {len(backed_files)}") |
|
for f in backed_files: |
|
print(f" • {f.name}") |
|
|
|
try: |
|
ch = input("\n Przywrócić? [T/n] > ").strip().lower() |
|
except (EOFError, KeyboardInterrupt): |
|
return |
|
if ch == "n": |
|
return |
|
|
|
restored = 0 |
|
for src in backed_files: |
|
dst = SCRIPT_DIR / src.name |
|
try: |
|
shutil.copy2(src, dst) |
|
_print_ok(f" Przywrócono: {src.name}") |
|
restored += 1 |
|
except OSError as e: |
|
_print_warn(f" Błąd: {src.name}: {e}") |
|
|
|
LocalManifest.rebuild() |
|
_print_ok(f"\nRollback: {restored} plików przywróconych ✓") |
|
_print_warn("Uruchom skrypt ponownie aby załadować przywróconą wersję.") |
|
|
|
@classmethod |
|
def show_history(cls) -> None: |
|
_print_header("📋 HISTORIA AKTUALIZACJI") |
|
if not UPDATE_LOG.exists(): |
|
_print_info("Brak historii aktualizacji.") |
|
return |
|
try: |
|
with open(UPDATE_LOG) as f: |
|
history = json.load(f) |
|
except Exception: |
|
_print_warn("Nie można odczytać historii.") |
|
return |
|
for entry in history[-15:]: |
|
ts = entry.get("ts", "?")[:16] |
|
bv = entry.get("bundle_version", "?") |
|
files = entry.get("files", []) |
|
ok = entry.get("success", True) |
|
icon = "✓" if ok else "✗" |
|
print(f" {icon} {ts} bundle={bv} ({len(files)} plików)") |
|
for fn in files[:4]: |
|
print(f" • {fn}") |
|
|
|
@classmethod |
|
def status(cls) -> None: |
|
_print_header("📊 STAN PLIKÓW — PlayBox Titanium") |
|
for name in MANAGED_FILES: |
|
p = SCRIPT_DIR / name |
|
if p.exists(): |
|
sz = p.stat().st_size |
|
sha = LocalManifest.sha256_of(p)[:12] |
|
mtime = time.strftime( |
|
"%Y-%m-%d %H:%M", |
|
time.localtime(p.stat().st_mtime) |
|
) |
|
print(f" ✓ {name:<42} {sz:>8} B {mtime} [{sha}…]") |
|
else: |
|
print(f" ✗ {name:<42} BRAK") |
|
|
|
@classmethod |
|
def generate_manifest(cls, output_dir: Optional[str] = None) -> Path: |
|
out = Path(output_dir) if output_dir else SCRIPT_DIR |
|
out.mkdir(parents=True, exist_ok=True) |
|
bundle_ver = _detect_version() |
|
files: Dict = {} |
|
for name in MANAGED_FILES: |
|
src = SCRIPT_DIR / name |
|
if not src.exists(): |
|
continue |
|
sha = LocalManifest.sha256_of(src) |
|
size = src.stat().st_size |
|
ver = _detect_file_version(src) |
|
if out != SCRIPT_DIR: |
|
shutil.copy2(src, out / name) |
|
files[name] = { |
|
"version": ver, |
|
"sha256": sha, |
|
"size": size, |
|
"url": "", |
|
"notes": "", |
|
} |
|
manifest_data = { |
|
"bundle_version": bundle_ver, |
|
"released": time.strftime("%Y-%m-%dT%H:%M:%S"), |
|
"notes": f"PlayBox Titanium v{bundle_ver} release", |
|
"files": files, |
|
} |
|
mf_path = out / "manifest.json" |
|
with open(mf_path, "w") as f: |
|
json.dump(manifest_data, f, indent=2) |
|
_print_ok(f"Manifest wygenerowany: {mf_path}") |
|
_print_info(f" Bundle version : {bundle_ver}") |
|
_print_info(f" Pliki : {len(files)}") |
|
return mf_path |
|
|
|
# ── Wewnętrzne ──────────────────────────────────────────────────────── |
|
|
|
@classmethod |
|
def _fetch_best_manifest(cls) -> Optional[Manifest]: |
|
for src in UPDATE_SOURCES: |
|
m = cls._fetch_manifest_from_source(src) |
|
if m is not None: |
|
return m |
|
return None |
|
|
|
@classmethod |
|
def _fetch_manifest_from_source(cls, src: Dict) -> Optional[Manifest]: |
|
t = src.get("type", "") |
|
try: |
|
if t == "url": |
|
return Downloader.fetch_manifest_url(src["base"]) |
|
elif t == "local": |
|
return Downloader.fetch_manifest_local(src["path"]) |
|
elif t == "zip": |
|
extracted = Downloader.fetch_zip(src["url"]) |
|
if extracted: |
|
return Downloader.fetch_manifest_local(str(extracted)) |
|
except Exception: |
|
pass |
|
return None |
|
|
|
@classmethod |
|
def _check_updates(cls, manifest: Manifest) -> List[str]: |
|
""" |
|
[FIX-2] Eliminujemy podwójne hashowanie. |
|
scan_installed() liczy sha256 i zwraca dict — używamy go bezpośrednio. |
|
""" |
|
installed = LocalManifest.scan_installed() # {name: sha256} |
|
to_update: List[str] = [] |
|
for name, fi in manifest.files.items(): |
|
if name not in MANAGED_FILES: |
|
continue |
|
local_path = SCRIPT_DIR / name |
|
if not local_path.exists(): |
|
to_update.append(name) |
|
continue |
|
# Użyj sha256 z installed dict (już obliczone) — bez re-hashowania |
|
local_sha = installed.get(name, "") |
|
if not local_sha: |
|
# Plik istnieje ale nie był w scannie (np. nowy) — hash raz |
|
local_sha = LocalManifest.sha256_of(local_path) |
|
if local_sha != fi.sha256: |
|
to_update.append(name) |
|
return to_update |
|
|
|
@classmethod |
|
def _install_updates(cls, manifest: Manifest, files: List[str]) -> bool: |
|
ts = str(int(time.time())) |
|
bk_dir = BACKUP_DIR / ts |
|
bk_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
_print_info(f"\n Backup → {bk_dir}") |
|
installed_ok: List[str] = [] |
|
failed: List[str] = [] |
|
|
|
for name in files: |
|
fi = manifest.files[name] |
|
dst = SCRIPT_DIR / name |
|
tmp = CACHE_DIR / f"_upd_{name}" |
|
|
|
# Krok 1: backup |
|
if dst.exists(): |
|
shutil.copy2(dst, bk_dir / name) |
|
|
|
# Krok 2: pobierz |
|
ok = False |
|
if fi.url.startswith("http"): |
|
ok = Downloader.fetch_url(fi.url, tmp, show_progress=True) |
|
elif fi.url and Path(fi.url).exists(): |
|
ok = Downloader.fetch_local(Path(fi.url), tmp) |
|
|
|
if not ok: |
|
_print_warn(f" ✗ Pobieranie nieudane: {name}") |
|
tmp.unlink(missing_ok=True) |
|
failed.append(name) |
|
continue |
|
|
|
# Krok 3: weryfikacja SHA-256 |
|
if fi.sha256: |
|
# [OPT-2] Użyj sha256_of_fresh (plik właśnie pobrany — nie cache) |
|
actual_sha = LocalManifest.sha256_of_fresh(tmp) |
|
if actual_sha != fi.sha256: |
|
_print_warn(f" ✗ SHA-256 mismatch: {name}") |
|
_print_warn(f" oczekiwany : {fi.sha256[:16]}…") |
|
_print_warn(f" pobrany : {actual_sha[:16]}…") |
|
tmp.unlink(missing_ok=True) |
|
failed.append(name) |
|
continue |
|
|
|
# Krok 4: [OPT-2] atomowy move (os.replace) zamiast shutil.move |
|
try: |
|
os.replace(str(tmp), str(dst)) |
|
_print_ok(f" ✓ {name} [{fi.version}]") |
|
installed_ok.append(name) |
|
except OSError as e: |
|
# Fallback do shutil.move (cross-device) |
|
try: |
|
shutil.move(str(tmp), str(dst)) |
|
_print_ok(f" ✓ {name} [{fi.version}] (shutil fallback)") |
|
installed_ok.append(name) |
|
except OSError as e2: |
|
_print_warn(f" ✗ Zapis nieudany: {name}: {e2}") |
|
tmp.unlink(missing_ok=True) |
|
failed.append(name) |
|
|
|
LocalManifest.rebuild() |
|
cls._log_update(manifest.bundle_version, installed_ok, len(failed) == 0) |
|
|
|
print() |
|
if installed_ok: |
|
_print_ok(f"Zaktualizowano: {len(installed_ok)} plików ✓") |
|
if failed: |
|
_print_warn(f"Nieudane: {len(failed)} — sprawdź źródło aktualizacji") |
|
if installed_ok: |
|
_print_warn("Uruchom skrypt ponownie aby załadować zaktualizowane moduły.") |
|
_print_info(f" Rollback: python3 autopilot_updater.py --rollback") |
|
|
|
return len(failed) == 0 |
|
|
|
@classmethod |
|
def _select_files(cls, available: List[str], manifest: Manifest) -> List[str]: |
|
print() |
|
for i, name in enumerate(available, 1): |
|
fi = manifest.files.get(name) |
|
ver = fi.version if fi else "?" |
|
print(f" {i}. {name} [{ver}]") |
|
try: |
|
sel = input("\n Numery (np. 1,3) lub 'all' > ").strip() |
|
except (EOFError, KeyboardInterrupt): |
|
return [] |
|
if sel.lower() == "all": |
|
return available |
|
selected = [] |
|
for part in sel.split(","): |
|
part = part.strip() |
|
if part.isdigit(): |
|
idx = int(part) - 1 |
|
if 0 <= idx < len(available): |
|
selected.append(available[idx]) |
|
return selected |
|
|
|
@classmethod |
|
def _log_update(cls, bundle_ver: str, files: List[str], success: bool) -> None: |
|
history = [] |
|
if UPDATE_LOG.exists(): |
|
try: |
|
with open(UPDATE_LOG) as f: |
|
history = json.load(f) |
|
except Exception: |
|
pass |
|
history.append({ |
|
"ts": time.strftime("%Y-%m-%dT%H:%M:%S"), |
|
"bundle_version": bundle_ver, |
|
"files": files, |
|
"success": success, |
|
}) |
|
history = history[-50:] |
|
with open(UPDATE_LOG, "w") as f: |
|
json.dump(history, f, indent=2) |
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════ |
|
# Helpery print |
|
# ═══════════════════════════════════════════════════════════════════════════ |
|
|
|
_C = { |
|
"s": "\033[92m", "w": "\033[93m", "e": "\033[91m", |
|
"i": "\033[94m", "h": "\033[95m", "r": "\033[0m", "b": "\033[1m", |
|
} |
|
|
|
def _print_ok(msg: str) -> None: print(f"{_C['s']}✓ {msg}{_C['r']}") |
|
def _print_warn(msg: str) -> None: print(f"{_C['w']}⚠ {msg}{_C['r']}") |
|
def _print_info(msg: str) -> None: print(f"{_C['i']}{msg}{_C['r']}") |
|
def _print_header(msg: str) -> None: |
|
sep = "═" * 60 |
|
print(f"\n{_C['h']}{_C['b']}{sep}\n {msg}\n{sep}{_C['r']}\n") |
|
|
|
|
|
def _detect_version() -> str: |
|
p = SCRIPT_DIR / "Autopilot_v157.py" |
|
if not p.exists(): |
|
return "0.0" |
|
try: |
|
for line in p.read_text(encoding="utf-8").splitlines()[:30]: |
|
if line.strip().startswith("VERSION"): |
|
import re |
|
m = re.search(r'["\']([0-9.]+)["\']', line) |
|
if m: |
|
return m.group(1) |
|
except Exception: |
|
pass |
|
return "0.0" |
|
|
|
|
|
def _detect_file_version(path: Path) -> str: |
|
import re |
|
try: |
|
for line in path.read_text(encoding="utf-8").splitlines()[:20]: |
|
m = re.search(r"VERSION[_A-Z]*\s*=\s*['\"]([0-9.]+)['\"]", line) |
|
if m: |
|
return m.group(1) |
|
m = re.search(r"v([0-9]+\.[0-9]+)", line) |
|
if m: |
|
return m.group(1) |
|
except Exception: |
|
pass |
|
return "?" |
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════ |
|
# [FIX-3] register_updater — poprawiona integracja z App |
|
# Nie zastępuje _menu. Przechowuje dispatch w _upd_dispatch. |
|
# ═══════════════════════════════════════════════════════════════════════════ |
|
|
|
def register_updater(app: object) -> None: |
|
""" |
|
Wstrzyknij UpdateManager do instancji App przez _upd_dispatch. |
|
Opcja 'upd' i pochodne są obsługiwane przez pętlę dispatch w _menu. |
|
""" |
|
dispatch_upd = { |
|
"upd": UpdateManager.interactive, |
|
"upd_rb": UpdateManager.rollback, |
|
"upd_hist": UpdateManager.show_history, |
|
"upd_stat": UpdateManager.status, |
|
"upd_gen": UpdateManager.generate_manifest, |
|
} |
|
|
|
if not hasattr(app, "_upd_dispatch"): |
|
app._upd_dispatch = {} |
|
app._upd_dispatch.update(dispatch_upd) |
|
|
|
_print_ok("UpdateManager zarejestrowany: upd upd_rb upd_hist upd_stat upd_gen") |
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════ |
|
# CLI |
|
# ═══════════════════════════════════════════════════════════════════════════ |
|
|
|
if __name__ == "__main__": |
|
import argparse |
|
|
|
parser = argparse.ArgumentParser( |
|
description="PlayBox Titanium — Auto-Update Manager", |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
epilog=""" |
|
PRZYKŁADY: |
|
python3 autopilot_updater.py --check |
|
python3 autopilot_updater.py --install |
|
python3 autopilot_updater.py --rollback |
|
python3 autopilot_updater.py --history |
|
python3 autopilot_updater.py --status |
|
python3 autopilot_updater.py --generate |
|
python3 autopilot_updater.py --generate /srv/playbox |
|
python3 autopilot_updater.py --sources |
|
""" |
|
) |
|
parser.add_argument("--check", action="store_true") |
|
parser.add_argument("--install", action="store_true") |
|
parser.add_argument("--rollback", action="store_true") |
|
parser.add_argument("--history", action="store_true") |
|
parser.add_argument("--status", action="store_true") |
|
parser.add_argument("--generate", nargs="?", const=".", metavar="DIR") |
|
parser.add_argument("--sources", action="store_true") |
|
args = parser.parse_args() |
|
|
|
if args.sources: |
|
_print_header("📡 ŹRÓDŁA AKTUALIZACJI") |
|
if not UPDATE_SOURCES: |
|
_print_warn("Brak skonfigurowanych źródeł.") |
|
_print_info("Edytuj UPDATE_SOURCES w autopilot_updater.py") |
|
for i, src in enumerate(UPDATE_SOURCES, 1): |
|
print(f" {i}. type={src.get('type')} " |
|
f"path/base/url=" |
|
f"{src.get('path') or src.get('base') or src.get('url','?')}") |
|
elif args.generate is not None: |
|
UpdateManager.generate_manifest(args.generate if args.generate != "." else None) |
|
elif args.rollback: |
|
UpdateManager.rollback() |
|
elif args.history: |
|
UpdateManager.show_history() |
|
elif args.status: |
|
UpdateManager.status() |
|
elif args.install: |
|
UpdateManager.check_on_startup(silent=False) |
|
elif args.check: |
|
UpdateManager.check_on_startup(silent=False) |
|
else: |
|
UpdateManager.interactive() |