Created
February 16, 2026 08:02
-
-
Save thushan/f28e6dd543495a7e28b28640cdfa9c0d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Gets photos from Photoplus in bulk, async & resume support | |
| ---- | |
| $ python grab-photoplus.py --id 1234567 | |
| Querying activity 123456... | |
| 1181 on server, 37 on disk, 1144 to fetch | |
| ░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ 8/1144 [2.8 MB/s] 2_7837711.jpg | |
| ---- | |
| # Basic usage | |
| python grab-photoplus.py --id 1234567 | |
| # Custom output directory | |
| python grab-photoplus.py --id 1234567 --output /path/to/photos | |
| # Re-run to grab any new photos (automatically skips existing ones) | |
| python grab-photoplus.py --id 1234562 | |
| """ | |
| import argparse | |
| import hashlib | |
| import json | |
| import os | |
| import re | |
| import sys | |
| import time | |
| import urllib.error | |
| import urllib.parse | |
| import urllib.request | |
| from concurrent.futures import ThreadPoolExecutor, Future | |
| from pathlib import Path | |
| from threading import Lock, Event | |
| SALT = "laxiaoheiwu" | |
| class PhotoPlusGrabber: | |
| ENDPOINT = "https://live.photoplus.cn/pic/pics" | |
| WORKERS = 10 | |
| DODGY_CHARS = re.compile(r'[<>:"/\\|?*]') | |
| def __init__(self, activity_id: int, dest: Path): | |
| self.activity_id = activity_id | |
| self.dest = dest | |
| self._lock = Lock() | |
| self._finished_count = 0 | |
| self._total_count = 0 | |
| self._bytes_grabbed = 0 | |
| self._errors: list[tuple[str, str]] = [] | |
| self._cancelled = Event() | |
| self._started_at = 0.0 | |
| def run(self) -> int: | |
| self.dest.mkdir(parents=True, exist_ok=True) | |
| self._log(f"Querying activity {self.activity_id}...") | |
| catalogue = self._fetch_catalogue() | |
| if catalogue is None: | |
| return 1 | |
| remote_urls = { | |
| self._clean_name(u): u for u in catalogue["urls"] | |
| } | |
| if not remote_urls: | |
| self._log("No photos found for this activity.") | |
| return 1 | |
| already_got = set(os.listdir(self.dest)) | |
| pending = { | |
| name: url | |
| for name, url in remote_urls.items() | |
| if name not in already_got | |
| } | |
| self._log( | |
| f"{catalogue['total']} on server, " | |
| f"{len(already_got)} on disk, " | |
| f"{len(pending)} to fetch" | |
| ) | |
| if not pending: | |
| self._log("All up to date.") | |
| return 0 | |
| self._total_count = len(pending) | |
| self._finished_count = 0 | |
| self._bytes_grabbed = 0 | |
| self._errors.clear() | |
| self._started_at = time.monotonic() | |
| self._download_batch(pending) | |
| if self._cancelled.is_set(): | |
| self._print_interruption_summary() | |
| return 130 | |
| if self._errors: | |
| self._log(f"\n{len(self._errors)} failed:") | |
| for fname, reason in self._errors: | |
| self._log(f" {fname} -- {reason}") | |
| return 1 | |
| self._log("All done.") | |
| return 0 | |
| def _download_batch(self, pending: dict[str, str]): | |
| tp = ThreadPoolExecutor(max_workers=self.WORKERS) | |
| jobs: dict[Future, str] = {} | |
| for name, url in pending.items(): | |
| jobs[tp.submit(self._grab, name, url)] = name | |
| try: | |
| self._wait_for_jobs(jobs) | |
| except KeyboardInterrupt: | |
| self._cancelled.set() | |
| finally: | |
| tp.shutdown(wait=False, cancel_futures=True) | |
| def _wait_for_jobs(self, jobs: dict[Future, str]): | |
| remaining = set(jobs.keys()) | |
| while remaining: | |
| # poll with a short timeout so CTRL+C can interrupt between iterations | |
| newly_done = set() | |
| for fut in remaining: | |
| if fut.done(): | |
| newly_done.add(fut) | |
| for fut in newly_done: | |
| remaining.discard(fut) | |
| name = jobs[fut] | |
| nbytes = 0 | |
| try: | |
| nbytes = fut.result() or 0 | |
| except Exception as exc: | |
| self._errors.append((name, str(exc))) | |
| self._bump_progress(name, nbytes) | |
| if remaining and not newly_done: | |
| time.sleep(0.05) | |
| def _fetch_catalogue(self) -> dict | None: | |
| ts = int(time.time() * 1000) | |
| query_bits = { | |
| "activityNo": self.activity_id, | |
| "isNew": False, | |
| "count": 9999, | |
| "page": 1, | |
| "ppSign": "live", | |
| "picUpIndex": "", | |
| "_t": ts, | |
| } | |
| serialised = "&".join( | |
| f"{k}={query_bits[k]}" for k in sorted(query_bits) if query_bits[k] is not None | |
| ) | |
| digest = hashlib.md5((serialised + SALT).encode()).hexdigest() | |
| query_bits["_s"] = digest | |
| full_url = f"{self.ENDPOINT}?{urllib.parse.urlencode(query_bits)}" | |
| try: | |
| with urllib.request.urlopen(full_url, timeout=30) as resp: | |
| body = json.loads(resp.read()) | |
| except urllib.error.HTTPError as exc: | |
| self._log(f"API returned HTTP {exc.code}. Check the activity ID.") | |
| return None | |
| except urllib.error.URLError as exc: | |
| self._log(f"Could not reach API: {exc.reason}") | |
| return None | |
| except json.JSONDecodeError: | |
| self._log("API returned garbled response. Try again later.") | |
| return None | |
| if not isinstance(body, dict) or "result" not in body: | |
| msg = body.get("msg") or body.get("message") or "unknown error" | |
| self._log(f"API error: {msg}") | |
| return None | |
| bucket = body["result"] | |
| if not isinstance(bucket, dict): | |
| self._log("Unexpected API response shape. The ID may be invalid.") | |
| return None | |
| pics = bucket.get("pics_array") | |
| if pics is None: | |
| self._log("No photo data in response. The activity ID is likely wrong.") | |
| return None | |
| return { | |
| "total": bucket.get("pics_total", len(pics)), | |
| "urls": [f"https:{pic['origin_img']}" for pic in pics if pic.get("origin_img")], | |
| } | |
| def _grab(self, fname: str, url: str) -> int: | |
| if self._cancelled.is_set(): | |
| return 0 | |
| out_path = self.dest / fname | |
| tmp_path = out_path.with_suffix(out_path.suffix + ".tmp") | |
| try: | |
| with urllib.request.urlopen(url, timeout=60) as resp: | |
| blob = resp.read() | |
| tmp_path.write_bytes(blob) | |
| tmp_path.replace(out_path) | |
| return len(blob) | |
| except Exception: | |
| if tmp_path.exists(): | |
| tmp_path.unlink() | |
| raise | |
| def _clean_name(self, url: str) -> str: | |
| raw = url.rsplit("/", 1)[-1].split("?", 1)[0].split("#", 1)[0] | |
| return self.DODGY_CHARS.sub("_", raw) | |
| def _format_speed(self, bytes_per_sec: float) -> str: | |
| if bytes_per_sec >= 1_048_576: | |
| return f"{bytes_per_sec / 1_048_576:.1f} MB/s" | |
| return f"{bytes_per_sec / 1024:.0f} KB/s" | |
| def _bump_progress(self, fname: str, nbytes: int): | |
| with self._lock: | |
| self._finished_count += 1 | |
| self._bytes_grabbed += nbytes | |
| n, t = self._finished_count, self._total_count | |
| elapsed = time.monotonic() - self._started_at | |
| speed_str = self._format_speed(self._bytes_grabbed / elapsed) if elapsed > 0.2 else "..." | |
| w = 30 | |
| filled = int(w * n / t) | |
| bar = "\u2588" * filled + "\u2591" * (w - filled) | |
| status = f"\r {bar} {n}/{t} [{speed_str}] {fname}" | |
| sys.stderr.write(f"{status:<100}") | |
| sys.stderr.flush() | |
| if n == t: | |
| sys.stderr.write("\n") | |
| def _print_interruption_summary(self): | |
| sys.stderr.write("\n") | |
| self._log( | |
| f"Interrupted. {self._finished_count}/{self._total_count} " | |
| f"downloaded this session." | |
| ) | |
| if self._errors: | |
| self._log(f"{len(self._errors)} had errors before interruption.") | |
| self._log("Run the same command again to grab the rest.") | |
| @staticmethod | |
| def _log(msg: str): | |
| print(msg, flush=True) | |
| def _parse_args() -> argparse.Namespace: | |
| ap = argparse.ArgumentParser(description="Bulk-download PhotoPlus albums") | |
| ap.add_argument("--id", type=int, required=True, dest="activity_id") | |
| ap.add_argument("--output", type=str, default=None) | |
| return ap.parse_args() | |
| if __name__ == "__main__": | |
| opts = _parse_args() | |
| where = Path(opts.output) if opts.output else Path("images") / str(opts.activity_id) | |
| grabber = PhotoPlusGrabber(opts.activity_id, where) | |
| sys.exit(grabber.run()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment