Created
June 18, 2026 11:27
-
-
Save ivorpad/8d8a45a7952ff330a6f051a60a7ffae7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import argparse | |
| import base64 | |
| import json | |
| import mimetypes | |
| import os | |
| import re | |
| import shlex | |
| import sys | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any | |
| from urllib.error import HTTPError, URLError | |
| from urllib.parse import quote, unquote, urljoin, urlparse, urlunparse | |
| from urllib.request import HTTPRedirectHandler, Request, build_opener | |
| DEFAULT_OUT_DIR = "gpt-export-output" | |
| CANDIDATE_ENDPOINTS = [ | |
| "/backend-api/files/download/{file_id}?gizmo_id={gizmo_id}&download_intent=true", | |
| "/backend-api/files/download/{file_id}?gizmo_id={gizmo_id}", | |
| "/backend-api/files/download/{file_id}?download_intent=true", | |
| "/backend-api/files/download/{file_id}", | |
| "/backend-api/files/{file_id}/download?gizmo_id={gizmo_id}", | |
| "/backend-api/files/{file_id}/content?gizmo_id={gizmo_id}", | |
| "/backend-api/files/{file_id}/download", | |
| "/backend-api/files/{file_id}/content", | |
| "/backend-api/files/{file_id}", | |
| "/backend-api/files/{file_id}/raw", | |
| "/backend-api/gizmos/{gizmo_id}/files/{file_id}/download", | |
| "/backend-api/gizmos/{gizmo_id}/files/{file_id}/content", | |
| ] | |
| GIZMO_EXPORT_ENDPOINTS = [ | |
| "/backend-api/gizmos/{gizmo_id}/export", | |
| ] | |
| HOP_BY_HOP_HEADERS = { | |
| ":authority", | |
| ":method", | |
| ":path", | |
| ":scheme", | |
| "accept-encoding", | |
| "connection", | |
| "content-length", | |
| "host", | |
| "origin", | |
| "referer", | |
| "te", | |
| "trailer", | |
| "transfer-encoding", | |
| "upgrade", | |
| } | |
| EXTERNAL_REDIRECT_HEADER_ALLOWLIST = { | |
| "accept", | |
| "accept-language", | |
| "user-agent", | |
| } | |
| SENSITIVE_KEY_EXACT = { | |
| "api_key", | |
| "apikey", | |
| "authorization", | |
| "cookie", | |
| "credentials", | |
| "csrf", | |
| "jwt", | |
| "password", | |
| "secret", | |
| "session", | |
| "session_id", | |
| "set-cookie", | |
| "signature", | |
| "sig", | |
| "token", | |
| } | |
| SIGNED_URL_KEYS = { | |
| "content_url", | |
| "download_url", | |
| "file_url", | |
| "href", | |
| "signed_url", | |
| "url", | |
| } | |
| @dataclass(frozen=True) | |
| class JsonPayload: | |
| index: int | |
| method: str | |
| url: str | |
| host: str | |
| path: str | |
| status: int | |
| content_type: str | |
| data: Any | |
| @dataclass(frozen=True) | |
| class FetchResult: | |
| status: int | None | |
| headers: dict[str, str] | |
| body: bytes | |
| final_url: str | |
| error: str | None = None | |
| class NoRedirectHandler(HTTPRedirectHandler): | |
| def redirect_request(self, req, fp, code, msg, headers, newurl): # type: ignore[no-untyped-def] | |
| return None | |
| def load_har(path: Path) -> dict[str, Any]: | |
| with path.open("r", encoding="utf-8") as handle: | |
| data = json.load(handle) | |
| if not isinstance(data, dict) or "log" not in data: | |
| raise ValueError(f"{path} does not look like a HAR file") | |
| return data | |
| def har_entries(har: dict[str, Any]) -> list[dict[str, Any]]: | |
| entries = har.get("log", {}).get("entries", []) | |
| if not isinstance(entries, list): | |
| return [] | |
| return [entry for entry in entries if isinstance(entry, dict)] | |
| def decode_har_response_text(content: dict[str, Any]) -> str | None: | |
| text = content.get("text") | |
| if not isinstance(text, str): | |
| return None | |
| if content.get("encoding") == "base64": | |
| try: | |
| return base64.b64decode(text).decode("utf-8", "replace") | |
| except Exception: | |
| return None | |
| return text | |
| def extract_json_payloads(har: dict[str, Any]) -> list[JsonPayload]: | |
| payloads: list[JsonPayload] = [] | |
| for index, entry in enumerate(har_entries(har)): | |
| request = entry.get("request", {}) | |
| response = entry.get("response", {}) | |
| if not isinstance(request, dict) or not isinstance(response, dict): | |
| continue | |
| url = request.get("url") | |
| if not isinstance(url, str): | |
| continue | |
| parsed = urlparse(url) | |
| content = response.get("content", {}) | |
| if not isinstance(content, dict): | |
| continue | |
| text = decode_har_response_text(content) | |
| if not text: | |
| continue | |
| stripped = text.lstrip() | |
| if not stripped.startswith(("{", "[")): | |
| continue | |
| try: | |
| data = json.loads(text) | |
| except json.JSONDecodeError: | |
| continue | |
| payloads.append( | |
| JsonPayload( | |
| index=index, | |
| method=str(request.get("method") or "GET"), | |
| url=url, | |
| host=parsed.netloc, | |
| path=parsed.path, | |
| status=int(response.get("status") or 0), | |
| content_type=str(content.get("mimeType") or ""), | |
| data=data, | |
| ) | |
| ) | |
| return payloads | |
| def request_headers_from_entry(entry: dict[str, Any]) -> dict[str, str]: | |
| request = entry.get("request", {}) | |
| headers: dict[str, str] = {} | |
| for header in request.get("headers", []) if isinstance(request, dict) else []: | |
| if not isinstance(header, dict): | |
| continue | |
| name = str(header.get("name") or "") | |
| value = str(header.get("value") or "") | |
| lower = name.lower() | |
| if not name or lower in HOP_BY_HOP_HEADERS: | |
| continue | |
| value = value.replace("\r", "").replace("\n", "") | |
| headers[name] = value | |
| if not any(name.lower() == "cookie" for name in headers): | |
| cookies = request.get("cookies", []) if isinstance(request, dict) else [] | |
| cookie_parts = [] | |
| for cookie in cookies: | |
| if not isinstance(cookie, dict): | |
| continue | |
| name = str(cookie.get("name") or "") | |
| value = str(cookie.get("value") or "") | |
| if name: | |
| cookie_parts.append(f"{name}={value}") | |
| if cookie_parts: | |
| headers["Cookie"] = "; ".join(cookie_parts) | |
| headers.setdefault("accept", "*/*") | |
| headers.setdefault("user-agent", "Mozilla/5.0") | |
| return headers | |
| def has_auth_material(headers: dict[str, str]) -> bool: | |
| return any(name.lower() in {"authorization", "cookie"} for name in headers) | |
| def merge_runtime_auth_headers( | |
| headers: dict[str, str], | |
| cookie_env: str | None = None, | |
| authorization_env: str | None = None, | |
| curl_file: Path | None = None, | |
| ) -> dict[str, str]: | |
| merged = dict(headers) | |
| if curl_file: | |
| merged.update(auth_headers_from_curl_file(curl_file)) | |
| if cookie_env: | |
| cookie_value = os.environ.get(cookie_env) | |
| if cookie_value: | |
| merged["Cookie"] = cookie_value.replace("\r", "").replace("\n", "") | |
| if authorization_env: | |
| authorization_value = os.environ.get(authorization_env) | |
| if authorization_value: | |
| if not authorization_value.lower().startswith("bearer "): | |
| authorization_value = f"Bearer {authorization_value}" | |
| merged["Authorization"] = authorization_value.replace("\r", "").replace("\n", "") | |
| return merged | |
| def normalize_authorization_header(value: str) -> str: | |
| value = value.strip() | |
| if not value: | |
| return value | |
| if value.lower().startswith("bearer "): | |
| return value | |
| return f"Bearer {value}" | |
| def parse_header_line(value: str) -> tuple[str, str] | None: | |
| if ":" not in value: | |
| return None | |
| name, header_value = value.split(":", 1) | |
| name = name.strip() | |
| header_value = header_value.strip() | |
| if not name: | |
| return None | |
| return name, header_value | |
| def auth_headers_from_curl_text(text: str) -> dict[str, str]: | |
| try: | |
| parts = shlex.split(text) | |
| except ValueError as error: | |
| raise ValueError(f"could not parse cURL file: {error}") from error | |
| headers: dict[str, str] = {} | |
| index = 0 | |
| while index < len(parts): | |
| part = parts[index] | |
| next_part = parts[index + 1] if index + 1 < len(parts) else None | |
| if part in {"-H", "--header"} and next_part is not None: | |
| parsed = parse_header_line(next_part) | |
| if parsed: | |
| name, value = parsed | |
| lower = name.lower() | |
| if lower == "authorization": | |
| headers["Authorization"] = normalize_authorization_header(value) | |
| elif lower == "cookie": | |
| headers["Cookie"] = value | |
| index += 2 | |
| continue | |
| if part.startswith("-H") and part != "-H": | |
| parsed = parse_header_line(part[2:]) | |
| if parsed: | |
| name, value = parsed | |
| lower = name.lower() | |
| if lower == "authorization": | |
| headers["Authorization"] = normalize_authorization_header(value) | |
| elif lower == "cookie": | |
| headers["Cookie"] = value | |
| elif part in {"-b", "--cookie", "--cookie-jar"} and next_part is not None: | |
| if part != "--cookie-jar": | |
| headers["Cookie"] = next_part.strip() | |
| index += 2 | |
| continue | |
| index += 1 | |
| return headers | |
| def auth_headers_from_curl_file(path: Path) -> dict[str, str]: | |
| text = path.read_text(encoding="utf-8") | |
| return auth_headers_from_curl_text(text) | |
| def score_auth_entry(entry: dict[str, Any]) -> int: | |
| request = entry.get("request", {}) | |
| if not isinstance(request, dict): | |
| return -1 | |
| url = str(request.get("url") or "") | |
| method = str(request.get("method") or "") | |
| parsed = urlparse(url) | |
| score = 0 | |
| if "backend-api" in parsed.path and "chatgpt.com" in parsed.netloc: | |
| score += 20 | |
| elif "backend-api" in parsed.path: | |
| score += 10 | |
| else: | |
| return -1 | |
| if method == "GET": | |
| score += 3 | |
| if "/backend-api/gizmos/" in parsed.path and "/snorlax/" not in parsed.path: | |
| score += 100 | |
| return score | |
| def best_auth_entry(har: dict[str, Any]) -> dict[str, Any] | None: | |
| scored = [(score_auth_entry(entry), index, entry) for index, entry in enumerate(har_entries(har))] | |
| scored = [item for item in scored if item[0] >= 0] | |
| if not scored: | |
| return None | |
| scored.sort(key=lambda item: (item[0], item[1]), reverse=True) | |
| return scored[0][2] | |
| def auth_origin(entry: dict[str, Any] | None) -> tuple[str, str]: | |
| if not entry: | |
| return "https", "chatgpt.com" | |
| request = entry.get("request", {}) | |
| url = str(request.get("url") or "https://chatgpt.com") if isinstance(request, dict) else "https://chatgpt.com" | |
| parsed = urlparse(url) | |
| return parsed.scheme or "https", parsed.netloc or "chatgpt.com" | |
| def is_same_or_subdomain(host: str, allowed_host: str) -> bool: | |
| normalized_host = host.lower().split(":", 1)[0] | |
| normalized_allowed = allowed_host.lower().split(":", 1)[0] | |
| return normalized_host == normalized_allowed or normalized_host.endswith("." + normalized_allowed) | |
| def headers_for_url(url: str, auth_headers: dict[str, str], auth_host: str) -> dict[str, str]: | |
| host = urlparse(url).netloc | |
| if is_same_or_subdomain(host, auth_host): | |
| return dict(auth_headers) | |
| return { | |
| name: value | |
| for name, value in auth_headers.items() | |
| if name.lower() in EXTERNAL_REDIRECT_HEADER_ALLOWLIST | |
| } | |
| def fetch_url( | |
| url: str, | |
| auth_headers: dict[str, str], | |
| auth_host: str, | |
| timeout: int, | |
| max_redirects: int = 5, | |
| ) -> FetchResult: | |
| opener = build_opener(NoRedirectHandler) | |
| current_url = url | |
| for _ in range(max_redirects + 1): | |
| headers = headers_for_url(current_url, auth_headers, auth_host) | |
| request = Request(current_url, headers=headers, method="GET") | |
| try: | |
| with opener.open(request, timeout=timeout) as response: | |
| body = response.read() | |
| return FetchResult( | |
| status=response.status, | |
| headers={key.lower(): value for key, value in response.headers.items()}, | |
| body=body, | |
| final_url=response.geturl(), | |
| ) | |
| except HTTPError as error: | |
| location = error.headers.get("Location") | |
| if error.code in {301, 302, 303, 307, 308} and location: | |
| current_url = urljoin(current_url, location) | |
| continue | |
| body = error.read() | |
| return FetchResult( | |
| status=error.code, | |
| headers={key.lower(): value for key, value in error.headers.items()}, | |
| body=body, | |
| final_url=current_url, | |
| error=str(error), | |
| ) | |
| except URLError as error: | |
| return FetchResult( | |
| status=None, | |
| headers={}, | |
| body=b"", | |
| final_url=current_url, | |
| error=str(error.reason), | |
| ) | |
| return FetchResult( | |
| status=None, | |
| headers={}, | |
| body=b"", | |
| final_url=current_url, | |
| error=f"too many redirects after {max_redirects}", | |
| ) | |
| def is_gizmo_payload(data: Any) -> bool: | |
| if not isinstance(data, dict): | |
| return False | |
| gizmo = data.get("gizmo") | |
| return isinstance(gizmo, dict) and ( | |
| bool(gizmo.get("id")) | |
| or "instructions" in gizmo | |
| or "display" in gizmo | |
| or "files" in data | |
| ) | |
| def group_gizmo_payloads(payloads: list[JsonPayload]) -> dict[str, list[JsonPayload]]: | |
| grouped: dict[str, list[JsonPayload]] = {} | |
| for payload in payloads: | |
| if not is_gizmo_payload(payload.data): | |
| continue | |
| gizmo = payload.data.get("gizmo") if isinstance(payload.data, dict) else {} | |
| gizmo_id = str(gizmo.get("id") or f"unknown-entry-{payload.index}") | |
| grouped.setdefault(gizmo_id, []).append(payload) | |
| return grouped | |
| def parse_datetime(value: Any) -> float: | |
| if not isinstance(value, str): | |
| return 0.0 | |
| try: | |
| return datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp() | |
| except ValueError: | |
| return 0.0 | |
| def version_value(value: Any) -> int: | |
| try: | |
| return int(value) | |
| except (TypeError, ValueError): | |
| return -1 | |
| def latest_payload(payloads: list[JsonPayload]) -> JsonPayload: | |
| def key(payload: JsonPayload) -> tuple[int, float, int]: | |
| gizmo = payload.data.get("gizmo", {}) if isinstance(payload.data, dict) else {} | |
| return ( | |
| version_value(gizmo.get("version")), | |
| parse_datetime(gizmo.get("version_updated_at") or gizmo.get("updated_at")), | |
| payload.index, | |
| ) | |
| return max(payloads, key=key) | |
| def is_sensitive_key(key: str) -> bool: | |
| lower = key.lower() | |
| if lower in SENSITIVE_KEY_EXACT: | |
| return True | |
| if lower.endswith(("_token", "-token")): | |
| return True | |
| if lower.startswith(("token_", "secret_", "credential_")): | |
| return True | |
| if "authorization" in lower or "set-cookie" in lower: | |
| return True | |
| return False | |
| def redact_url(value: str) -> str: | |
| parsed = urlparse(value) | |
| if not parsed.scheme or not parsed.netloc: | |
| return value | |
| return urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, "", "")) | |
| def sanitize_for_disk(value: Any, parent_key: str = "") -> Any: | |
| if isinstance(value, dict): | |
| sanitized: dict[str, Any] = {} | |
| for key, item in value.items(): | |
| key_text = str(key) | |
| if is_sensitive_key(key_text): | |
| sanitized[key_text] = "[REDACTED]" | |
| else: | |
| sanitized[key_text] = sanitize_for_disk(item, key_text) | |
| return sanitized | |
| if isinstance(value, list): | |
| return [sanitize_for_disk(item, parent_key) for item in value] | |
| if isinstance(value, str): | |
| parent_lower = parent_key.lower() | |
| if parent_lower in SIGNED_URL_KEYS or parent_lower.endswith("_url"): | |
| return redact_url(value) | |
| return value | |
| return value | |
| def write_json(path: Path, value: Any) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(value, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") | |
| def write_text(path: Path, value: str) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(value, encoding="utf-8") | |
| def slugify(value: str, fallback: str) -> str: | |
| cleaned = re.sub(r"[^A-Za-z0-9._-]+", "-", value.strip()) | |
| cleaned = re.sub(r"-{2,}", "-", cleaned).strip("-._") | |
| return cleaned[:80] or fallback | |
| def safe_filename(name: str | None, fallback: str, content_type: str | None = None) -> str: | |
| candidate = unquote(name or "").replace("\\", "/").rsplit("/", 1)[-1].strip() | |
| candidate = re.sub(r"[\x00-\x1f\x7f]+", "", candidate) | |
| candidate = re.sub(r"[^A-Za-z0-9._ -]+", "_", candidate) | |
| candidate = candidate.strip(" .") | |
| if not candidate: | |
| candidate = fallback | |
| if "." not in candidate and content_type: | |
| extension = mimetypes.guess_extension(content_type.split(";", 1)[0].strip()) | |
| if extension: | |
| candidate += extension | |
| return candidate[:180] | |
| def unique_path(path: Path) -> Path: | |
| if not path.exists(): | |
| return path | |
| stem = path.stem | |
| suffix = path.suffix | |
| for counter in range(2, 10_000): | |
| candidate = path.with_name(f"{stem}-{counter}{suffix}") | |
| if not candidate.exists(): | |
| return candidate | |
| raise RuntimeError(f"could not find unique filename for {path}") | |
| def walk_json(value: Any, path: str = "$"): | |
| yield path, value | |
| if isinstance(value, dict): | |
| for key, item in value.items(): | |
| yield from walk_json(item, f"{path}.{key}") | |
| elif isinstance(value, list): | |
| for index, item in enumerate(value): | |
| yield from walk_json(item, f"{path}[{index}]") | |
| def file_id_from_dict(value: dict[str, Any]) -> str | None: | |
| file_id = value.get("file_id") | |
| if isinstance(file_id, str) and file_id: | |
| return file_id | |
| id_value = value.get("id") | |
| if isinstance(id_value, str) and id_value.startswith("file_"): | |
| return id_value | |
| return None | |
| def extract_files(payloads: list[JsonPayload]) -> list[dict[str, Any]]: | |
| files_by_id: dict[str, dict[str, Any]] = {} | |
| for payload in payloads: | |
| for source_path, value in walk_json(payload.data): | |
| if not isinstance(value, dict): | |
| continue | |
| file_id = file_id_from_dict(value) | |
| if not file_id: | |
| continue | |
| file_meta = { | |
| "file_id": file_id, | |
| "name": value.get("name") or value.get("filename") or value.get("file_name"), | |
| "type": value.get("type") or value.get("mime_type") or value.get("content_type"), | |
| "size": value.get("size") or value.get("bytes") or value.get("file_size"), | |
| "created_at": value.get("created_at"), | |
| "last_modified": value.get("last_modified"), | |
| "location": value.get("location"), | |
| "library_file_id": value.get("library_file_id"), | |
| "metadata": value.get("metadata"), | |
| "file_size_tokens": value.get("file_size_tokens"), | |
| "sources": [f"entry:{payload.index}:{source_path}"], | |
| } | |
| existing = files_by_id.get(file_id) | |
| if existing is None: | |
| files_by_id[file_id] = file_meta | |
| continue | |
| for key, item in file_meta.items(): | |
| if key == "sources": | |
| existing[key] = sorted(set(existing.get(key, []) + item)) | |
| elif existing.get(key) in {None, ""} and item not in {None, ""}: | |
| existing[key] = item | |
| return list(files_by_id.values()) | |
| def content_type(headers: dict[str, str]) -> str: | |
| return headers.get("content-type", "") | |
| def looks_like_json_response(result: FetchResult) -> bool: | |
| ctype = content_type(result.headers).lower() | |
| return "application/json" in ctype or result.body.lstrip().startswith((b"{", b"[")) | |
| def looks_like_html_response(result: FetchResult) -> bool: | |
| ctype = content_type(result.headers).lower() | |
| prefix = result.body[:512].lstrip().lower() | |
| return "text/html" in ctype or prefix.startswith((b"<!doctype html", b"<html")) | |
| def json_from_response(result: FetchResult) -> Any | None: | |
| try: | |
| return json.loads(result.body.decode("utf-8")) | |
| except Exception: | |
| return None | |
| def find_download_url(value: Any) -> str | None: | |
| for _, item in walk_json(value): | |
| if not isinstance(item, dict): | |
| continue | |
| for key, candidate in item.items(): | |
| if str(key).lower() not in SIGNED_URL_KEYS: | |
| continue | |
| if isinstance(candidate, str) and candidate.startswith(("https://", "http://")): | |
| return candidate | |
| return None | |
| def url_label(url: str) -> str: | |
| parsed = urlparse(url) | |
| return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", "", "")) | |
| def build_candidate_urls( | |
| scheme: str, | |
| host: str, | |
| file_id: str, | |
| gizmo_id: str | None, | |
| ) -> list[str]: | |
| encoded_file_id = quote(file_id, safe="") | |
| encoded_gizmo_id = quote(gizmo_id or "", safe="") | |
| urls: list[str] = [] | |
| seen: set[str] = set() | |
| for template in CANDIDATE_ENDPOINTS: | |
| if "{gizmo_id}" in template and not gizmo_id: | |
| continue | |
| path = template.format(file_id=encoded_file_id, gizmo_id=encoded_gizmo_id) | |
| url = f"{scheme}://{host}{path}" | |
| if url not in seen: | |
| seen.add(url) | |
| urls.append(url) | |
| return urls | |
| def save_file_response(result: FetchResult, file_meta: dict[str, Any], out_dir: Path) -> str: | |
| name = safe_filename( | |
| str(file_meta.get("name") or ""), | |
| fallback=f"{file_meta['file_id']}.bin", | |
| content_type=str(file_meta.get("type") or content_type(result.headers) or ""), | |
| ) | |
| out_path = unique_path(out_dir / name) | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| out_path.write_bytes(result.body) | |
| return str(out_path) | |
| def content_disposition_filename(value: str | None) -> str | None: | |
| if not value: | |
| return None | |
| match = re.search(r"filename\\*=UTF-8''([^;]+)", value, flags=re.IGNORECASE) | |
| if match: | |
| return unquote(match.group(1).strip().strip('"')) | |
| match = re.search(r'filename="?([^";]+)"?', value, flags=re.IGNORECASE) | |
| if match: | |
| return match.group(1).strip() | |
| return None | |
| def save_gizmo_export_response(result: FetchResult, gizmo_id: str, out_dir: Path) -> str: | |
| filename = content_disposition_filename(result.headers.get("content-disposition")) | |
| name = safe_filename(filename, fallback=f"{gizmo_id}-export.zip", content_type=content_type(result.headers)) | |
| if "." not in name: | |
| name += ".zip" | |
| out_path = unique_path(out_dir / name) | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| out_path.write_bytes(result.body) | |
| return str(out_path) | |
| def build_gizmo_export_urls(scheme: str, host: str, gizmo_id: str) -> list[str]: | |
| encoded_gizmo_id = quote(gizmo_id, safe="") | |
| return [ | |
| f"{scheme}://{host}{template.format(gizmo_id=encoded_gizmo_id)}" | |
| for template in GIZMO_EXPORT_ENDPOINTS | |
| ] | |
| def download_gizmo_export( | |
| gizmo_id: str, | |
| scheme: str, | |
| host: str, | |
| auth_headers: dict[str, str], | |
| auth_host: str, | |
| out_dir: Path, | |
| timeout: int, | |
| ) -> dict[str, Any]: | |
| report: dict[str, Any] = { | |
| "gizmo_id": gizmo_id, | |
| "saved": False, | |
| "saved_path": None, | |
| "attempts": [], | |
| } | |
| for url in build_gizmo_export_urls(scheme, host, gizmo_id): | |
| result = fetch_url(url, auth_headers, auth_host, timeout=timeout) | |
| attempt = { | |
| "url": url_label(url), | |
| "status": result.status, | |
| "content_type": content_type(result.headers), | |
| "final_url": url_label(result.final_url), | |
| "error": result.error, | |
| } | |
| if looks_like_json_response(result): | |
| attempt["json_response"] = sanitize_for_disk(json_from_response(result)) | |
| report["attempts"].append(attempt) | |
| if result.status != 200 or not result.body: | |
| continue | |
| if looks_like_json_response(result) or looks_like_html_response(result): | |
| continue | |
| report["saved_path"] = save_gizmo_export_response(result, gizmo_id, out_dir) | |
| report["saved"] = True | |
| return report | |
| return report | |
| def download_one_file( | |
| file_meta: dict[str, Any], | |
| gizmo_id: str | None, | |
| scheme: str, | |
| host: str, | |
| auth_headers: dict[str, str], | |
| auth_host: str, | |
| out_dir: Path, | |
| timeout: int, | |
| ) -> dict[str, Any]: | |
| report: dict[str, Any] = { | |
| "file_id": file_meta.get("file_id"), | |
| "name": file_meta.get("name"), | |
| "saved": False, | |
| "saved_path": None, | |
| "attempts": [], | |
| } | |
| for url in build_candidate_urls(scheme, host, str(file_meta["file_id"]), gizmo_id): | |
| result = fetch_url(url, auth_headers, auth_host, timeout=timeout) | |
| attempt = { | |
| "url": url_label(url), | |
| "status": result.status, | |
| "content_type": content_type(result.headers), | |
| "final_url": url_label(result.final_url), | |
| "error": result.error, | |
| } | |
| if looks_like_json_response(result): | |
| attempt["json_response"] = sanitize_for_disk(json_from_response(result)) | |
| report["attempts"].append(attempt) | |
| if result.status != 200 or not result.body: | |
| continue | |
| if looks_like_json_response(result): | |
| data = json_from_response(result) | |
| signed_url = find_download_url(data) | |
| if not signed_url: | |
| attempt["json_response"] = sanitize_for_disk(data) | |
| continue | |
| signed_result = fetch_url(signed_url, auth_headers, auth_host, timeout=timeout) | |
| signed_attempt = { | |
| "url": url_label(signed_url), | |
| "status": signed_result.status, | |
| "content_type": content_type(signed_result.headers), | |
| "final_url": url_label(signed_result.final_url), | |
| "error": signed_result.error, | |
| "via_signed_url": True, | |
| } | |
| report["attempts"].append(signed_attempt) | |
| if signed_result.status == 200 and signed_result.body and not looks_like_html_response(signed_result): | |
| report["saved_path"] = save_file_response(signed_result, file_meta, out_dir) | |
| report["saved"] = True | |
| return report | |
| continue | |
| if looks_like_html_response(result): | |
| attempt["skipped"] = "html response" | |
| continue | |
| report["saved_path"] = save_file_response(result, file_meta, out_dir) | |
| report["saved"] = True | |
| return report | |
| return report | |
| def compact_gizmo_context(payload: JsonPayload, files: list[dict[str, Any]]) -> dict[str, Any]: | |
| data = payload.data if isinstance(payload.data, dict) else {} | |
| gizmo = data.get("gizmo", {}) if isinstance(data, dict) else {} | |
| display = gizmo.get("display", {}) if isinstance(gizmo, dict) else {} | |
| return { | |
| "id": gizmo.get("id"), | |
| "name": display.get("name"), | |
| "description": display.get("description"), | |
| "instructions": gizmo.get("instructions"), | |
| "prompt_starters": display.get("prompt_starters"), | |
| "model": gizmo.get("model") or gizmo.get("default_model"), | |
| "voice": gizmo.get("voice"), | |
| "tools": data.get("tools"), | |
| "files": files, | |
| "product_features": data.get("product_features"), | |
| "permissions": gizmo.get("current_user_permission"), | |
| "sharing": gizmo.get("sharing"), | |
| "sharing_targets": gizmo.get("sharing_targets"), | |
| "created_at": gizmo.get("created_at"), | |
| "updated_at": gizmo.get("updated_at"), | |
| "version": gizmo.get("version"), | |
| "version_created_at": gizmo.get("version_created_at"), | |
| "version_updated_at": gizmo.get("version_updated_at"), | |
| } | |
| def render_summary_markdown(payload: JsonPayload, files: list[dict[str, Any]]) -> str: | |
| data = payload.data if isinstance(payload.data, dict) else {} | |
| gizmo = data.get("gizmo", {}) if isinstance(data, dict) else {} | |
| display = gizmo.get("display", {}) if isinstance(gizmo, dict) else {} | |
| name = display.get("name") or gizmo.get("id") or "GPT" | |
| lines = [ | |
| f"# {name}", | |
| "", | |
| f"- ID: `{gizmo.get('id', '')}`", | |
| f"- Version: `{gizmo.get('version', '')}`", | |
| f"- Created: `{gizmo.get('created_at', '')}`", | |
| f"- Updated: `{gizmo.get('updated_at', '')}`", | |
| f"- Can export: `{(gizmo.get('current_user_permission') or {}).get('can_export', '')}`", | |
| "", | |
| ] | |
| description = display.get("description") | |
| if description: | |
| lines.extend(["## Description", "", str(description), ""]) | |
| instructions = gizmo.get("instructions") | |
| if instructions: | |
| lines.extend(["## Instructions", "", str(instructions).rstrip(), ""]) | |
| starters = display.get("prompt_starters") | |
| if starters: | |
| lines.extend(["## Prompt Starters", ""]) | |
| for starter in starters: | |
| lines.append(f"- {starter}") | |
| lines.append("") | |
| lines.extend(["## Knowledge Files", ""]) | |
| if files: | |
| lines.append("| Name | File ID | Type | Size |") | |
| lines.append("| --- | --- | --- | ---: |") | |
| for file_meta in files: | |
| lines.append( | |
| "| " | |
| + " | ".join( | |
| [ | |
| str(file_meta.get("name") or ""), | |
| f"`{file_meta.get('file_id') or ''}`", | |
| str(file_meta.get("type") or ""), | |
| str(file_meta.get("size") or ""), | |
| ] | |
| ) | |
| + " |" | |
| ) | |
| else: | |
| lines.append("No file references were found in the HAR payloads.") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def version_dir_name(payload: JsonPayload) -> str: | |
| gizmo = payload.data.get("gizmo", {}) if isinstance(payload.data, dict) else {} | |
| version = gizmo.get("version") | |
| version_label = f"version-{version}" if version not in {None, ""} else "version-unknown" | |
| return f"{version_label}-entry-{payload.index}" | |
| def write_har_index(payloads: list[JsonPayload], out_dir: Path) -> None: | |
| index = [ | |
| { | |
| "entry": payload.index, | |
| "method": payload.method, | |
| "host": payload.host, | |
| "path": payload.path, | |
| "status": payload.status, | |
| "content_type": payload.content_type, | |
| "is_gizmo_payload": is_gizmo_payload(payload.data), | |
| } | |
| for payload in payloads | |
| ] | |
| write_json(out_dir / "har-index.json", index) | |
| def dump_all_json(payloads: list[JsonPayload], out_dir: Path) -> None: | |
| dump_dir = out_dir / "har-json-responses" | |
| for payload in payloads: | |
| name = slugify(f"{payload.index}-{payload.method}-{payload.path}", f"entry-{payload.index}") | |
| write_json( | |
| dump_dir / f"{name}.json", | |
| { | |
| "source": { | |
| "entry": payload.index, | |
| "method": payload.method, | |
| "host": payload.host, | |
| "path": payload.path, | |
| "status": payload.status, | |
| "content_type": payload.content_type, | |
| }, | |
| "payload": sanitize_for_disk(payload.data), | |
| }, | |
| ) | |
| def export_gpts( | |
| har: dict[str, Any], | |
| har_path: Path, | |
| out_dir: Path, | |
| download: bool, | |
| dump_json: bool, | |
| timeout: int, | |
| cookie_env: str | None = None, | |
| authorization_env: str | None = None, | |
| curl_file: Path | None = None, | |
| ) -> dict[str, Any]: | |
| payloads = extract_json_payloads(har) | |
| grouped = group_gizmo_payloads(payloads) | |
| auth_entry = best_auth_entry(har) | |
| scheme, host = auth_origin(auth_entry) | |
| auth_headers = request_headers_from_entry(auth_entry) if auth_entry else {} | |
| auth_headers = merge_runtime_auth_headers( | |
| auth_headers, | |
| cookie_env=cookie_env, | |
| authorization_env=authorization_env, | |
| curl_file=curl_file, | |
| ) | |
| _, auth_host = auth_origin(auth_entry) | |
| auth_ready = has_auth_material(auth_headers) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| write_har_index(payloads, out_dir) | |
| if dump_json: | |
| dump_all_json(payloads, out_dir) | |
| manifest: dict[str, Any] = { | |
| "generated_at": datetime.now(timezone.utc).isoformat(), | |
| "source_har": str(har_path), | |
| "json_payload_count": len(payloads), | |
| "gpt_count": len(grouped), | |
| "download_attempted": download, | |
| "auth_material_found": auth_ready, | |
| "runtime_cookie_env_used": bool(cookie_env and os.environ.get(cookie_env)), | |
| "runtime_authorization_env_used": bool(authorization_env and os.environ.get(authorization_env)), | |
| "runtime_curl_file_used": bool(curl_file), | |
| "gpts": [], | |
| } | |
| for gizmo_id, snapshots in sorted(grouped.items()): | |
| latest = latest_payload(snapshots) | |
| files = extract_files(snapshots) | |
| data = latest.data if isinstance(latest.data, dict) else {} | |
| gizmo = data.get("gizmo", {}) if isinstance(data, dict) else {} | |
| display = gizmo.get("display", {}) if isinstance(gizmo, dict) else {} | |
| name = str(display.get("name") or gizmo_id) | |
| gpt_dir = out_dir / "gpts" / slugify(f"{name}-{gizmo_id}", gizmo_id) | |
| gpt_dir.mkdir(parents=True, exist_ok=True) | |
| write_text(gpt_dir / "summary.md", render_summary_markdown(latest, files)) | |
| write_text(gpt_dir / "instructions.md", str(gizmo.get("instructions") or "")) | |
| write_json( | |
| gpt_dir / "metadata.json", | |
| { | |
| "source": { | |
| "entry": latest.index, | |
| "method": latest.method, | |
| "host": latest.host, | |
| "path": latest.path, | |
| "status": latest.status, | |
| }, | |
| "payload": sanitize_for_disk(latest.data), | |
| }, | |
| ) | |
| write_json(gpt_dir / "context.json", sanitize_for_disk(compact_gizmo_context(latest, files))) | |
| write_json(gpt_dir / "files.json", sanitize_for_disk(files)) | |
| write_json(gpt_dir / "tools.json", sanitize_for_disk(data.get("tools"))) | |
| write_json(gpt_dir / "product-features.json", sanitize_for_disk(data.get("product_features"))) | |
| versions_dir = gpt_dir / "versions" | |
| for snapshot in snapshots: | |
| write_json( | |
| versions_dir / version_dir_name(snapshot) / "payload.json", | |
| { | |
| "source": { | |
| "entry": snapshot.index, | |
| "method": snapshot.method, | |
| "host": snapshot.host, | |
| "path": snapshot.path, | |
| "status": snapshot.status, | |
| }, | |
| "payload": sanitize_for_disk(snapshot.data), | |
| }, | |
| ) | |
| gizmo_export_report: dict[str, Any] | None = None | |
| if download: | |
| if auth_headers and auth_ready: | |
| gizmo_export_report = download_gizmo_export( | |
| gizmo_id=gizmo_id, | |
| scheme=scheme, | |
| host=host, | |
| auth_headers=auth_headers, | |
| auth_host=auth_host, | |
| out_dir=gpt_dir / "source-export", | |
| timeout=timeout, | |
| ) | |
| elif auth_headers: | |
| gizmo_export_report = { | |
| "error": "ChatGPT backend request found, but no Cookie or Authorization material was captured in the HAR" | |
| } | |
| else: | |
| gizmo_export_report = {"error": "no ChatGPT backend auth request found in HAR"} | |
| write_json(gpt_dir / "gizmo-export-report.json", sanitize_for_disk(gizmo_export_report or [])) | |
| download_reports: list[dict[str, Any]] = [] | |
| if download and files: | |
| if not auth_headers: | |
| download_reports.append({"error": "no ChatGPT backend auth request found in HAR"}) | |
| elif not auth_ready: | |
| download_reports.append( | |
| { | |
| "error": "ChatGPT backend request found, but no Cookie or Authorization material was captured in the HAR" | |
| } | |
| ) | |
| else: | |
| for file_meta in files: | |
| download_reports.append( | |
| download_one_file( | |
| file_meta=file_meta, | |
| gizmo_id=gizmo_id, | |
| scheme=scheme, | |
| host=host, | |
| auth_headers=auth_headers, | |
| auth_host=auth_host, | |
| out_dir=gpt_dir / "knowledge", | |
| timeout=timeout, | |
| ) | |
| ) | |
| write_json(gpt_dir / "download-report.json", sanitize_for_disk(download_reports)) | |
| manifest["gpts"].append( | |
| { | |
| "id": gizmo_id, | |
| "name": name, | |
| "directory": str(gpt_dir), | |
| "snapshot_count": len(snapshots), | |
| "latest_version": gizmo.get("version"), | |
| "file_count": len(files), | |
| "downloaded_file_count": sum(1 for item in download_reports if item.get("saved")), | |
| "gizmo_export_saved": bool(gizmo_export_report and gizmo_export_report.get("saved")), | |
| } | |
| ) | |
| write_json(out_dir / "manifest.json", sanitize_for_disk(manifest)) | |
| return manifest | |
| def build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser( | |
| description="Export Custom GPT metadata and knowledge files from a ChatGPT HAR file." | |
| ) | |
| parser.add_argument("har_path", type=Path, help="Path to chatgpt.com.har") | |
| parser.add_argument( | |
| "--out-dir", | |
| type=Path, | |
| default=Path(DEFAULT_OUT_DIR), | |
| help=f"Output directory. Default: {DEFAULT_OUT_DIR}", | |
| ) | |
| parser.add_argument( | |
| "--no-download", | |
| action="store_true", | |
| help="Only export metadata from the HAR. Do not make network calls.", | |
| ) | |
| parser.add_argument( | |
| "--dump-all-json", | |
| action="store_true", | |
| help="Write sanitized JSON responses for every JSON payload in the HAR.", | |
| ) | |
| parser.add_argument( | |
| "--timeout", | |
| type=int, | |
| default=60, | |
| help="Download timeout in seconds. Default: 60.", | |
| ) | |
| parser.add_argument( | |
| "--cookie-env", | |
| help="Name of an environment variable containing a Cookie header value for ChatGPT requests.", | |
| ) | |
| parser.add_argument( | |
| "--authorization-env", | |
| help="Name of an environment variable containing a Bearer token or full Authorization header value.", | |
| ) | |
| parser.add_argument( | |
| "--curl-file", | |
| type=Path, | |
| help="Text file containing a copied browser cURL command. Only auth headers are read.", | |
| ) | |
| return parser | |
| def main(argv: list[str] | None = None) -> int: | |
| args = build_parser().parse_args(argv) | |
| try: | |
| har = load_har(args.har_path) | |
| manifest = export_gpts( | |
| har=har, | |
| har_path=args.har_path, | |
| out_dir=args.out_dir, | |
| download=not args.no_download, | |
| dump_json=args.dump_all_json, | |
| timeout=args.timeout, | |
| cookie_env=args.cookie_env, | |
| authorization_env=args.authorization_env, | |
| curl_file=args.curl_file, | |
| ) | |
| except Exception as error: | |
| print(f"error: {error}", file=sys.stderr) | |
| return 1 | |
| print(f"Exported {manifest['gpt_count']} GPT(s) into {args.out_dir}") | |
| if not args.no_download and not manifest.get("auth_material_found"): | |
| print("Download skipped: the HAR did not capture Cookie or Authorization material.") | |
| for gpt in manifest["gpts"]: | |
| print( | |
| f"- {gpt['name']} | {gpt['id']} | " | |
| f"{gpt['file_count']} file ref(s), {gpt['downloaded_file_count']} downloaded" | |
| ) | |
| if not manifest["gpts"]: | |
| print("No GPT gizmo payloads were found. Export a HAR while viewing the GPT Configure page.") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment