Skip to content

Instantly share code, notes, and snippets.

@thushan
Created February 16, 2026 08:02
Show Gist options
  • Select an option

  • Save thushan/f28e6dd543495a7e28b28640cdfa9c0d to your computer and use it in GitHub Desktop.

Select an option

Save thushan/f28e6dd543495a7e28b28640cdfa9c0d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Gets photos from Photoplus in bulk, async & resume support
----
$ python grab-photoplus.py --id 1234567
Querying activity 123456...
1181 on server, 37 on disk, 1144 to fetch
░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ 8/1144 [2.8 MB/s] 2_7837711.jpg
----
# Basic usage
python grab-photoplus.py --id 1234567
# Custom output directory
python grab-photoplus.py --id 1234567 --output /path/to/photos
# Re-run to grab any new photos (automatically skips existing ones)
python grab-photoplus.py --id 1234562
"""
import argparse
import hashlib
import json
import os
import re
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from concurrent.futures import ThreadPoolExecutor, Future
from pathlib import Path
from threading import Lock, Event
SALT = "laxiaoheiwu"
class PhotoPlusGrabber:
ENDPOINT = "https://live.photoplus.cn/pic/pics"
WORKERS = 10
DODGY_CHARS = re.compile(r'[<>:"/\\|?*]')
def __init__(self, activity_id: int, dest: Path):
self.activity_id = activity_id
self.dest = dest
self._lock = Lock()
self._finished_count = 0
self._total_count = 0
self._bytes_grabbed = 0
self._errors: list[tuple[str, str]] = []
self._cancelled = Event()
self._started_at = 0.0
def run(self) -> int:
self.dest.mkdir(parents=True, exist_ok=True)
self._log(f"Querying activity {self.activity_id}...")
catalogue = self._fetch_catalogue()
if catalogue is None:
return 1
remote_urls = {
self._clean_name(u): u for u in catalogue["urls"]
}
if not remote_urls:
self._log("No photos found for this activity.")
return 1
already_got = set(os.listdir(self.dest))
pending = {
name: url
for name, url in remote_urls.items()
if name not in already_got
}
self._log(
f"{catalogue['total']} on server, "
f"{len(already_got)} on disk, "
f"{len(pending)} to fetch"
)
if not pending:
self._log("All up to date.")
return 0
self._total_count = len(pending)
self._finished_count = 0
self._bytes_grabbed = 0
self._errors.clear()
self._started_at = time.monotonic()
self._download_batch(pending)
if self._cancelled.is_set():
self._print_interruption_summary()
return 130
if self._errors:
self._log(f"\n{len(self._errors)} failed:")
for fname, reason in self._errors:
self._log(f" {fname} -- {reason}")
return 1
self._log("All done.")
return 0
def _download_batch(self, pending: dict[str, str]):
tp = ThreadPoolExecutor(max_workers=self.WORKERS)
jobs: dict[Future, str] = {}
for name, url in pending.items():
jobs[tp.submit(self._grab, name, url)] = name
try:
self._wait_for_jobs(jobs)
except KeyboardInterrupt:
self._cancelled.set()
finally:
tp.shutdown(wait=False, cancel_futures=True)
def _wait_for_jobs(self, jobs: dict[Future, str]):
remaining = set(jobs.keys())
while remaining:
# poll with a short timeout so CTRL+C can interrupt between iterations
newly_done = set()
for fut in remaining:
if fut.done():
newly_done.add(fut)
for fut in newly_done:
remaining.discard(fut)
name = jobs[fut]
nbytes = 0
try:
nbytes = fut.result() or 0
except Exception as exc:
self._errors.append((name, str(exc)))
self._bump_progress(name, nbytes)
if remaining and not newly_done:
time.sleep(0.05)
def _fetch_catalogue(self) -> dict | None:
ts = int(time.time() * 1000)
query_bits = {
"activityNo": self.activity_id,
"isNew": False,
"count": 9999,
"page": 1,
"ppSign": "live",
"picUpIndex": "",
"_t": ts,
}
serialised = "&".join(
f"{k}={query_bits[k]}" for k in sorted(query_bits) if query_bits[k] is not None
)
digest = hashlib.md5((serialised + SALT).encode()).hexdigest()
query_bits["_s"] = digest
full_url = f"{self.ENDPOINT}?{urllib.parse.urlencode(query_bits)}"
try:
with urllib.request.urlopen(full_url, timeout=30) as resp:
body = json.loads(resp.read())
except urllib.error.HTTPError as exc:
self._log(f"API returned HTTP {exc.code}. Check the activity ID.")
return None
except urllib.error.URLError as exc:
self._log(f"Could not reach API: {exc.reason}")
return None
except json.JSONDecodeError:
self._log("API returned garbled response. Try again later.")
return None
if not isinstance(body, dict) or "result" not in body:
msg = body.get("msg") or body.get("message") or "unknown error"
self._log(f"API error: {msg}")
return None
bucket = body["result"]
if not isinstance(bucket, dict):
self._log("Unexpected API response shape. The ID may be invalid.")
return None
pics = bucket.get("pics_array")
if pics is None:
self._log("No photo data in response. The activity ID is likely wrong.")
return None
return {
"total": bucket.get("pics_total", len(pics)),
"urls": [f"https:{pic['origin_img']}" for pic in pics if pic.get("origin_img")],
}
def _grab(self, fname: str, url: str) -> int:
if self._cancelled.is_set():
return 0
out_path = self.dest / fname
tmp_path = out_path.with_suffix(out_path.suffix + ".tmp")
try:
with urllib.request.urlopen(url, timeout=60) as resp:
blob = resp.read()
tmp_path.write_bytes(blob)
tmp_path.replace(out_path)
return len(blob)
except Exception:
if tmp_path.exists():
tmp_path.unlink()
raise
def _clean_name(self, url: str) -> str:
raw = url.rsplit("/", 1)[-1].split("?", 1)[0].split("#", 1)[0]
return self.DODGY_CHARS.sub("_", raw)
def _format_speed(self, bytes_per_sec: float) -> str:
if bytes_per_sec >= 1_048_576:
return f"{bytes_per_sec / 1_048_576:.1f} MB/s"
return f"{bytes_per_sec / 1024:.0f} KB/s"
def _bump_progress(self, fname: str, nbytes: int):
with self._lock:
self._finished_count += 1
self._bytes_grabbed += nbytes
n, t = self._finished_count, self._total_count
elapsed = time.monotonic() - self._started_at
speed_str = self._format_speed(self._bytes_grabbed / elapsed) if elapsed > 0.2 else "..."
w = 30
filled = int(w * n / t)
bar = "\u2588" * filled + "\u2591" * (w - filled)
status = f"\r {bar} {n}/{t} [{speed_str}] {fname}"
sys.stderr.write(f"{status:<100}")
sys.stderr.flush()
if n == t:
sys.stderr.write("\n")
def _print_interruption_summary(self):
sys.stderr.write("\n")
self._log(
f"Interrupted. {self._finished_count}/{self._total_count} "
f"downloaded this session."
)
if self._errors:
self._log(f"{len(self._errors)} had errors before interruption.")
self._log("Run the same command again to grab the rest.")
@staticmethod
def _log(msg: str):
print(msg, flush=True)
def _parse_args() -> argparse.Namespace:
ap = argparse.ArgumentParser(description="Bulk-download PhotoPlus albums")
ap.add_argument("--id", type=int, required=True, dest="activity_id")
ap.add_argument("--output", type=str, default=None)
return ap.parse_args()
if __name__ == "__main__":
opts = _parse_args()
where = Path(opts.output) if opts.output else Path("images") / str(opts.activity_id)
grabber = PhotoPlusGrabber(opts.activity_id, where)
sys.exit(grabber.run())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment