|
#!/usr/bin/env python3 |
|
""" |
|
Download all photos & videos from a public iCloud Shared Album. |
|
|
|
Usage: |
|
python3 download_icloud_album.py "https://www.icloud.com/sharedalbum/<locale>/#<ALBUM_TOKEN>" |
|
python3 download_icloud_album.py <ALBUM_TOKEN> ./my_folder |
|
|
|
No Apple login required — the album must be publicly shared. |
|
Only Python standard library is used (no installation needed). |
|
""" |
|
|
|
import json |
|
import os |
|
import sys |
|
import urllib.request |
|
import urllib.error |
|
|
|
BASE62 = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" |
|
|
|
|
|
def base62_to_int(s): |
|
n = 0 |
|
for c in s: |
|
n = n * 62 + BASE62.index(c) |
|
return n |
|
|
|
|
|
def initial_host(token): |
|
"""Guess the first server host from the token (corrected later via redirect).""" |
|
prefix = token[1:2] if token[0] == "A" else token[1:3] |
|
return f"p{base62_to_int(prefix):02d}-sharedstreams.icloud.com" |
|
|
|
|
|
def api_post(host, token, endpoint, payload): |
|
"""POST to the iCloud sharedstreams API. Follows the 330 redirect to the correct host.""" |
|
url = f"https://{host}/{token}/sharedstreams/{endpoint}" |
|
data = json.dumps(payload).encode() |
|
req = urllib.request.Request( |
|
url, |
|
data=data, |
|
headers={"Content-Type": "text/plain", "User-Agent": "Mozilla/5.0"}, |
|
) |
|
try: |
|
with urllib.request.urlopen(req) as resp: |
|
return json.load(resp), host |
|
except urllib.error.HTTPError as e: |
|
if e.code == 330: # Apple tells us the correct host |
|
new_host = json.load(e)["X-Apple-MMe-Host"] |
|
return api_post(new_host, token, endpoint, payload) |
|
raise |
|
|
|
|
|
def extract_token(arg): |
|
return arg.split("#", 1)[1] if "#" in arg else arg |
|
|
|
|
|
def main(): |
|
if len(sys.argv) < 2: |
|
print("Usage: python3 download_icloud_album.py <album-url-or-token> [target-folder]") |
|
sys.exit(1) |
|
|
|
token = extract_token(sys.argv[1]) |
|
out_dir = sys.argv[2] if len(sys.argv) > 2 else "icloud_photos" |
|
os.makedirs(out_dir, exist_ok=True) |
|
|
|
host = initial_host(token) |
|
|
|
# 1) Fetch album metadata (list of all photos) |
|
stream, host = api_post(host, token, "webstream", {"streamCtag": None}) |
|
photos = stream.get("photos", []) |
|
print(f"Found {len(photos)} items in the album.") |
|
if not photos: |
|
return |
|
|
|
# Sort chronologically by capture date so the sequential file numbering |
|
# matches the chronological order. |
|
photos.sort(key=lambda p: p.get("dateCreated") or "") |
|
|
|
# 2) Pick the highest resolution (= largest file) per photo |
|
guids = [p["photoGuid"] for p in photos] |
|
best_checksum = {} |
|
for p in photos: |
|
derivs = p.get("derivatives", {}) |
|
if not derivs: |
|
continue |
|
best = max(derivs.values(), key=lambda d: int(d.get("fileSize") or 0)) |
|
best_checksum[p["photoGuid"]] = best["checksum"] |
|
|
|
# 3) Resolve the actual download URLs (in batches of 25 guids) |
|
url_for_checksum = {} |
|
for i in range(0, len(guids), 25): |
|
batch = guids[i:i + 25] |
|
assets, host = api_post(host, token, "webasseturls", {"photoGuids": batch}) |
|
items = assets.get("items", {}) |
|
locations = assets.get("locations", {}) |
|
for checksum, item in items.items(): |
|
loc = locations.get(item["url_location"]) |
|
if not loc: |
|
continue |
|
scheme = loc.get("scheme", "https") |
|
host_name = loc["hosts"][0] |
|
url_for_checksum[checksum] = f"{scheme}://{host_name}{item['url_path']}" |
|
|
|
# 4) Download |
|
total = len(photos) |
|
for idx, p in enumerate(photos, start=1): |
|
checksum = best_checksum.get(p["photoGuid"]) |
|
url = url_for_checksum.get(checksum) if checksum else None |
|
if not url: |
|
print(f"[{idx}/{total}] no URL — skipped") |
|
continue |
|
|
|
# "2023-05-01T12:34:56Z" -> "2023-05-01_12-34-56" |
|
stamp = (p.get("dateCreated") or "").replace("T", "_")[:19].replace(":", "-") |
|
ext = os.path.splitext(url.split("?")[0])[1] or ".jpeg" |
|
name = f"{idx:04d}_{stamp}{ext}".replace("_.", ".") |
|
path = os.path.join(out_dir, name) |
|
|
|
if os.path.exists(path): |
|
print(f"[{idx}/{total}] {name} (already exists)") |
|
continue |
|
|
|
print(f"[{idx}/{total}] {name}") |
|
try: |
|
urllib.request.urlretrieve(url, path) |
|
except Exception as e: |
|
print(f" ERROR: {e}") |
|
|
|
print(f"\nDone. Files saved to: {os.path.abspath(out_dir)}/") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |