|  | #!/usr/bin/env python3 | 
        
          |  |  | 
        
          |  | import argparse | 
        
          |  | import mimetypes | 
        
          |  | import os | 
        
          |  | import re | 
        
          |  | import sys | 
        
          |  | import time | 
        
          |  | from pathlib import Path | 
        
          |  | import requests | 
        
          |  | from tqdm import tqdm | 
        
          |  |  | 
        
          |  | API_SUFFIX = "/api" | 
        
          |  |  | 
        
          |  | class OutlineClient: | 
        
          |  | def __init__(self, host: str, api_key: str): | 
        
          |  | self.base = host.rstrip("/") + API_SUFFIX | 
        
          |  | self.session = requests.Session() | 
        
          |  | self.session.headers.update({ | 
        
          |  | "Authorization": f"Bearer {api_key}", | 
        
          |  | "Accept": "application/json", | 
        
          |  | "Content-Type": "application/json", | 
        
          |  | }) | 
        
          |  |  | 
        
          |  | def _post(self, method: str, payload: dict) -> dict: | 
        
          |  | url = f"{self.base}/{method}" | 
        
          |  | for _ in range(5): | 
        
          |  | r = self.session.post(url, json=payload) | 
        
          |  | if r.status_code == 429: | 
        
          |  | retry_after = r.headers.get("Retry-After", "60") | 
        
          |  | try: | 
        
          |  | wait = int(float(retry_after)) | 
        
          |  | except ValueError: | 
        
          |  | wait = 60 | 
        
          |  | print(f"Rate limit hit. Waiting {wait}s...") | 
        
          |  | time.sleep(wait) | 
        
          |  | continue | 
        
          |  | if not r.ok: | 
        
          |  | raise RuntimeError(f"{method} β {r.status_code}: {r.text}") | 
        
          |  | data = r.json() | 
        
          |  | if not data.get("ok"): | 
        
          |  | raise RuntimeError(f"{method} error: {data}") | 
        
          |  | return data["data"] | 
        
          |  | raise RuntimeError(f"{method} failed after retries") | 
        
          |  |  | 
        
          |  | def get_or_create_collection(self, name: str, description: str = "") -> str: | 
        
          |  | existing = self._post("collections.list", {"query": name}) | 
        
          |  | for col in existing: | 
        
          |  | if col["name"].lower() == name.lower(): | 
        
          |  | return col["id"] | 
        
          |  | icon = guess_icon(name, default="π") | 
        
          |  | return self._post("collections.create", { | 
        
          |  | "name": name, | 
        
          |  | "description": description, | 
        
          |  | "icon": icon | 
        
          |  | })["id"] | 
        
          |  |  | 
        
          |  | def update_collection_description(self, collection_id: str, description: str): | 
        
          |  | self._post("collections.update", { | 
        
          |  | "id": collection_id, | 
        
          |  | "description": description | 
        
          |  | }) | 
        
          |  |  | 
        
          |  | def create_document(self, title: str, text: str, collection_id: str) -> dict: | 
        
          |  | return self._post("documents.create", { | 
        
          |  | "title": title, | 
        
          |  | "text": text, | 
        
          |  | "collectionId": collection_id, | 
        
          |  | "icon": guess_icon(title), | 
        
          |  | "publish": True | 
        
          |  | }) | 
        
          |  |  | 
        
          |  | def upload_attachment(self, path: Path) -> str: | 
        
          |  | mime = mimetypes.guess_type(path.name)[0] or "application/octet-stream" | 
        
          |  | meta = { | 
        
          |  | "name": path.name, | 
        
          |  | "contentType": mime, | 
        
          |  | "size": path.stat().st_size, | 
        
          |  | } | 
        
          |  | resp = self._post("attachments.create", meta) | 
        
          |  | with path.open("rb") as fh: | 
        
          |  | files = {"file": (path.name, fh, mime)} | 
        
          |  | requests.post(resp["uploadUrl"], data=resp["form"], files=files).raise_for_status() | 
        
          |  | return resp["attachment"]["url"] | 
        
          |  |  | 
        
          |  | def guess_icon(title: str, default: str = "π") -> str: | 
        
          |  | t = title.lower() | 
        
          |  | if "home" in t: return "π " | 
        
          |  | if "intro" in t or "start" in t: return "π" | 
        
          |  | if "install" in t or "setup" in t: return "π οΈ" | 
        
          |  | if "faq" in t or "help" in t: return "β" | 
        
          |  | if "guide" in t or "tutorial" in t: return "π" | 
        
          |  | if "advanced" in t: return "π¬" | 
        
          |  | if "api" in t: return "π" | 
        
          |  | if "diagram" in t or "arch" in t: return "ποΈ" | 
        
          |  | return default | 
        
          |  |  | 
        
          |  | IMG_RE = re.compile(r"!\[[^\]]*]\(([^)]+)\)") | 
        
          |  | MD_LINK_RE = re.compile(r"\[([^\]]+)]\(([^)]+)\)") | 
        
          |  |  | 
        
          |  | def rewrite_markdown(md: str, root: Path, client: OutlineClient) -> str: | 
        
          |  | def sub_img(m): | 
        
          |  | path = (root / m.group(1).split(" ")[0]).resolve() | 
        
          |  | return m.group(0).replace(m.group(1), client.upload_attachment(path)) if path.exists() else m.group(0) | 
        
          |  | return IMG_RE.sub(sub_img, md) | 
        
          |  |  | 
        
          |  | def fix_internal_links(md: str, url_map: dict[str, tuple[str, str]]) -> str: | 
        
          |  | def replace_link(m): | 
        
          |  | label, link = m.group(1).strip(), m.group(2).split("|")[0].strip() | 
        
          |  | base = os.path.splitext(os.path.basename(link))[0].lower() | 
        
          |  | if base in url_map: | 
        
          |  | title, urlid = url_map[base] | 
        
          |  | slug = re.sub(r"[^\w\s-]", "", title).strip().lower() | 
        
          |  | slug = re.sub(r"[\s_-]+", "-", slug) | 
        
          |  | return f"[{label}](/doc/{slug}-{urlid})" | 
        
          |  | return m.group(0) | 
        
          |  | return MD_LINK_RE.sub(replace_link, md) | 
        
          |  |  | 
        
          |  | def pretty_title(path: Path) -> str: | 
        
          |  | return re.sub(r"[-_]+", " ", path.stem).title() | 
        
          |  |  | 
        
          |  | def export_repo(repo_path: Path, client: OutlineClient, collection_name: str): | 
        
          |  | files = sorted(p for p in repo_path.rglob("*.md") if p.is_file()) | 
        
          |  | home_path = next((f for f in files if f.stem.lower() == "home"), None) | 
        
          |  | home_raw = home_path.read_text(encoding="utf-8") if home_path else "" | 
        
          |  |  | 
        
          |  | docs = [] | 
        
          |  | for f in tqdm(files, desc="Prepare"): | 
        
          |  | raw = f.read_text(encoding="utf-8") | 
        
          |  | md = rewrite_markdown(raw, f.parent, client) | 
        
          |  | docs.append((f, pretty_title(f), md)) | 
        
          |  |  | 
        
          |  | temp_map = {f.stem.lower(): (title, "temp") for f, title, _ in docs} | 
        
          |  | collection_id = client.get_or_create_collection( | 
        
          |  | collection_name, | 
        
          |  | fix_internal_links(home_raw, temp_map).strip() | 
        
          |  | ) | 
        
          |  |  | 
        
          |  | url_map = {} | 
        
          |  | for f, title, content in tqdm(docs, desc="Upload"): | 
        
          |  | fixed_md = fix_internal_links(content, url_map) | 
        
          |  | doc = client.create_document(title, fixed_md, collection_id) | 
        
          |  | url_map[f.stem.lower()] = (title, doc["urlId"]) | 
        
          |  |  | 
        
          |  | if home_raw.strip(): | 
        
          |  | fixed_home = fix_internal_links(home_raw, url_map).strip() | 
        
          |  | client.update_collection_description(collection_id, fixed_home) | 
        
          |  |  | 
        
          |  | def main(): | 
        
          |  | p = argparse.ArgumentParser() | 
        
          |  | p.add_argument("repo", type=Path) | 
        
          |  | p.add_argument("--api-key", required=True) | 
        
          |  | p.add_argument("--collection") | 
        
          |  | p.add_argument("--host", default="https://app.getoutline.com") | 
        
          |  | args = p.parse_args() | 
        
          |  |  | 
        
          |  | if not args.repo.is_dir(): | 
        
          |  | sys.exit("Not a directory: " + str(args.repo)) | 
        
          |  |  | 
        
          |  | client = OutlineClient(args.host, args.api_key) | 
        
          |  | export_repo(args.repo, client, args.collection or args.repo.stem) | 
        
          |  |  | 
        
          |  | if __name__ == "__main__": | 
        
          |  | main() |