Created
April 28, 2026 06:18
-
-
Save ParinLL/e5f7869a9abf80c6e79313c78ddd5b93 to your computer and use it in GitHub Desktop.
Obsidian scripts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| 將 Facebook public post 存成 Obsidian Clippings 筆記 | |
| 用法: | |
| python3 scripts/facebook-to-clippings.py <facebook_post_url> | |
| python3 scripts/facebook-to-clippings.py <facebook_post_url> --cookies ~/cookies.txt | |
| python3 scripts/facebook-to-clippings.py <facebook_post_url> --dry-run | |
| python3 scripts/facebook-to-clippings.py <facebook_post_url> --manual | |
| 取得 cookies.txt: | |
| 1. 安裝 Chrome 擴充套件「Get cookies.txt LOCALLY」 | |
| 2. 登入 Facebook 後,在 facebook.com 頁面點擊擴充套件 → Export | |
| 3. 儲存為 www.facebook.com_cookies.txt,放在 scripts/ 目錄下 | |
| 或用 --cookies 指定路徑(支援全站 cookies.txt,會自動過濾只留 Facebook cookies) | |
| 解析策略: | |
| - 貼文內文:優先用 og:description 或 <title> 作為 anchor, | |
| 再從 HTML JSON 找包含 anchor 開頭的完整 message.text | |
| - 作者:og:title / <title> 取第一個「 - 」前的部分(限 60 字), | |
| 避免抓到登入者(JSON actor.name) | |
| - 發文時間:用 text 內容定位 <script> 標籤,往後找 100,000 字內的 | |
| creation_time / publish_time(URL 和時間戳在 HTML 裡可能相距超過 100 萬字) | |
| - 圖片:從 <link rel="preload" data-preloader="CometSinglePost..."> 抓取, | |
| 過濾 t39.30808-1(profile picture) | |
| 輸出: | |
| Clippings/<貼文第一行>.md | |
| """ | |
| import re | |
| import sys | |
| import json | |
| import argparse | |
| import pathlib | |
| import http.cookiejar | |
| from datetime import datetime, timezone | |
| VAULT = pathlib.Path(__file__).resolve().parent.parent | |
| CLIPPINGS_DIR = VAULT / "Clippings" | |
| DEFAULT_COOKIES = pathlib.Path(__file__).resolve().parent / "www.facebook.com_cookies.txt" | |
| # ── helpers ────────────────────────────────────────────── | |
| def sanitize_filename(name: str) -> str: | |
| """移除檔名不合法字元""" | |
| return re.sub(r'[\\/:*?"<>|]', '-', name).strip() | |
| def format_images(images: list) -> str: | |
| """將圖片 URL 列表轉成 markdown""" | |
| if not images: | |
| return "" | |
| lines = [] | |
| for url in images: | |
| if url: | |
| lines.append(f"") | |
| return "\n".join(lines) | |
| def build_markdown(post: dict) -> tuple[str, str]: | |
| """ | |
| 從 post dict 建立 frontmatter + 內容,回傳 (filename, markdown) | |
| """ | |
| # ── 基本欄位 ── | |
| post_url = post.get("post_url") or post.get("original_request_url", "") | |
| text = post.get("text") or post.get("post_text") or "" | |
| author = post.get("username") or post.get("user_id") or "unknown" | |
| author_url = post.get("author_url") or "" | |
| # 時間 | |
| time_obj = post.get("time") | |
| if isinstance(time_obj, datetime): | |
| published = time_obj.strftime("%Y-%m-%d") | |
| elif isinstance(time_obj, str): | |
| published = time_obj[:10] | |
| else: | |
| published = datetime.now().strftime("%Y-%m-%d") | |
| created = datetime.now().strftime("%Y-%m-%d") | |
| # description(前 150 字,單行) | |
| description = " ".join(text.split())[:150] | |
| # 圖片 | |
| images = post.get("images") or post.get("images_lowquality") or [] | |
| image_md = format_images(images) | |
| # shared post | |
| shared_text = post.get("shared_text") or "" | |
| shared_from = post.get("shared_from") or "" | |
| # ── 組合內容 ── | |
| body_parts = [] | |
| if text: | |
| body_parts.append(text) | |
| if image_md: | |
| body_parts.append(image_md) | |
| if shared_text: | |
| quote_lines = "\n".join(f"> {line}" for line in shared_text.splitlines()) | |
| if shared_from: | |
| body_parts.append(f"**分享自 {shared_from}**\n\n{quote_lines}") | |
| else: | |
| body_parts.append(quote_lines) | |
| body = "\n\n".join(body_parts) if body_parts else "(無法取得貼文內容)" | |
| # ── frontmatter(格式對齊現有 Clippings)── | |
| import yaml | |
| # 自訂 representer:字串一律用雙引號,與現有 Clippings 格式一致 | |
| class QuotedStr(str): | |
| pass | |
| def quoted_representer(dumper, data): | |
| return dumper.represent_scalar("tag:yaml.org,2002:str", data, style='"') | |
| yaml.add_representer(QuotedStr, quoted_representer) | |
| title = f"{author} on Facebook ({published})" | |
| front = { | |
| "title": QuotedStr(title), | |
| "source": QuotedStr(post_url), | |
| "author": [QuotedStr(f"[{author}]({author_url})" if author_url else author)], | |
| "published": published, | |
| "created": created, | |
| "description": QuotedStr(description), | |
| "tags": [QuotedStr("clippings"), QuotedStr("facebook")], | |
| } | |
| yaml_str = yaml.dump( | |
| front, | |
| allow_unicode=True, | |
| default_flow_style=False, | |
| sort_keys=False, | |
| indent=2, | |
| ) | |
| frontmatter = f"---\n{yaml_str}---\n" | |
| markdown = frontmatter + "\n" + body + "\n" | |
| # 用貼文第一行(非空)當檔名,fallback 到 author - published | |
| first_line = next((l.strip() for l in text.splitlines() if l.strip()), "") | |
| if first_line: | |
| filename = sanitize_filename(first_line[:80] + ".md") | |
| else: | |
| filename = sanitize_filename(f"{author} - {published}.md") | |
| return filename, markdown | |
| def filter_facebook_cookies(src: pathlib.Path) -> pathlib.Path: | |
| """ | |
| 過濾 cookies.txt,只保留 facebook.com 相關的行, | |
| 寫到暫存檔後回傳路徑。 | |
| """ | |
| import tempfile | |
| lines = src.read_text(encoding="utf-8", errors="ignore").splitlines() | |
| fb_lines = [] | |
| for line in lines: | |
| stripped = line.strip() | |
| if not stripped or stripped.startswith("#"): | |
| fb_lines.append(line) | |
| continue | |
| parts = stripped.split("\t") | |
| if len(parts) >= 6 and "facebook" in parts[0].lower(): | |
| fb_lines.append(line) | |
| tmp = tempfile.NamedTemporaryFile( | |
| mode="w", suffix=".txt", delete=False, encoding="utf-8" | |
| ) | |
| tmp.write("\n".join(fb_lines) + "\n") | |
| tmp.close() | |
| return pathlib.Path(tmp.name) | |
| def load_cookies(cookies_path: pathlib.Path) -> http.cookiejar.MozillaCookieJar: | |
| """載入 cookies,若包含非 Facebook 行則先過濾""" | |
| jar = http.cookiejar.MozillaCookieJar() | |
| raw = cookies_path.read_text(encoding="utf-8", errors="ignore") | |
| has_non_fb = any( | |
| line.strip() and not line.strip().startswith("#") and "facebook" not in line.split("\t")[0].lower() | |
| for line in raw.splitlines() | |
| if "\t" in line | |
| ) | |
| if has_non_fb: | |
| filtered_path = filter_facebook_cookies(cookies_path) | |
| else: | |
| filtered_path = cookies_path | |
| jar.load(str(filtered_path), ignore_discard=True, ignore_expires=True) | |
| return jar | |
| def scrape_with_requests(url: str, cookies_path: pathlib.Path | None) -> dict | None: | |
| """ | |
| 用 requests + www.facebook.com 抓取貼文內容。 | |
| Facebook 把內容嵌在 HTML 的 JSON 資料裡,用 regex 解析。 | |
| """ | |
| try: | |
| import requests | |
| from bs4 import BeautifulSoup | |
| except ImportError: | |
| print("❌ 請先安裝: pip install requests beautifulsoup4") | |
| sys.exit(1) | |
| session = requests.Session() | |
| session.headers.update({ | |
| "User-Agent": ( | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) " | |
| "Chrome/124.0.0.0 Safari/537.36" | |
| ), | |
| "Accept-Language": "zh-TW,zh;q=0.9,en;q=0.8", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Sec-Fetch-Site": "none", | |
| "Sec-Fetch-Mode": "navigate", | |
| "Sec-Fetch-User": "?1", | |
| "Sec-Fetch-Dest": "document", | |
| }) | |
| if cookies_path and cookies_path.exists(): | |
| session.cookies = load_cookies(cookies_path) | |
| print(f"🍪 使用 cookies: {cookies_path}") | |
| else: | |
| print("⚠️ 未提供 cookies,可能只能抓到部分資訊") | |
| print(f"🌐 抓取: {url}") | |
| try: | |
| resp = session.get(url, timeout=20, allow_redirects=True) | |
| except Exception as e: | |
| print(f"❌ 請求失敗: {e}") | |
| return None | |
| if resp.status_code != 200: | |
| print(f"❌ HTTP {resp.status_code}") | |
| return None | |
| html = resp.text | |
| soup = BeautifulSoup(html, "html.parser") | |
| text = "" | |
| author = "unknown" | |
| published = datetime.now().strftime("%Y-%m-%d") | |
| images = [] | |
| # Step 1:從 og:description 取得貼文開頭(最可靠,一定是目標貼文) | |
| og_desc_match = re.search( | |
| r'<meta[^>]+property="og:description"[^>]+content="([^"]+)"', html | |
| ) | |
| if not og_desc_match: | |
| og_desc_match = re.search( | |
| r'<meta[^>]+content="([^"]+)"[^>]+property="og:description"', html | |
| ) | |
| og_desc = "" | |
| if og_desc_match: | |
| og_desc = og_desc_match.group(1).replace("'", "'").replace("&", "&").replace(""", '"').strip() | |
| # og:description 不存在時,從 <title> 取貼文標題作為 anchor | |
| title_anchor = "" | |
| if not og_desc: | |
| page_title = re.search(r'<title>([^<]+)</title>', html) | |
| if page_title: | |
| raw_title = page_title.group(1).strip() | |
| parts = re.split(r'\s*-\s*', raw_title, maxsplit=1) | |
| if len(parts) > 1: | |
| title_anchor = parts[1].rstrip("...").strip() | |
| anchor = og_desc or title_anchor | |
| # Step 2:從 JSON 找 message.text,只接受包含 anchor 開頭的版本 | |
| precise_patterns = [ | |
| r'"message"\s*:\s*\{"text"\s*:\s*"((?:[^"\\]|\\.)*)"\}', | |
| r'"story_message"\s*:\s*\{"text"\s*:\s*"((?:[^"\\]|\\.)*)"\}', | |
| r'"body"\s*:\s*\{"text"\s*:\s*"((?:[^"\\]|\\.)*)"\}', | |
| ] | |
| if anchor: | |
| anchor_start = anchor[:40].lower().strip() | |
| for pattern in precise_patterns: | |
| for m in re.finditer(pattern, html): | |
| raw = m.group(1) | |
| try: | |
| decoded = json.loads(f'"{raw}"') | |
| except Exception: | |
| decoded = raw.replace("\\n", "\n").replace('\\"', '"') | |
| if len(decoded) > 20 and anchor_start in decoded.lower()[:120]: | |
| text = decoded | |
| break | |
| if text: | |
| break | |
| else: | |
| candidates = [] | |
| for pattern in precise_patterns: | |
| for m in re.finditer(pattern, html): | |
| raw = m.group(1) | |
| try: | |
| decoded = json.loads(f'"{raw}"') | |
| except Exception: | |
| decoded = raw.replace("\\n", "\n").replace('\\"', '"') | |
| if len(decoded) > 50: | |
| candidates.append(decoded) | |
| if candidates: | |
| text = max(candidates, key=len) | |
| # Step 3:fallback — 直接用 og:description | |
| if not text: | |
| text = og_desc | |
| # 找作者名稱(og:title / <title> 取第一個「 - 」前的部分) | |
| og_title = re.search(r'<meta[^>]+property="og:title"[^>]+content="([^"]+)"', html) | |
| if not og_title: | |
| og_title = re.search(r'<meta[^>]+content="([^"]+)"[^>]+property="og:title"', html) | |
| if og_title: | |
| raw_title = og_title.group(1).strip() | |
| first_part = re.split(r'\s*[-|]\s*', raw_title, maxsplit=1)[0].strip() | |
| if first_part and len(first_part) <= 60 and first_part.lower() not in ("error", "facebook", ""): | |
| author = first_part | |
| if author == "unknown": | |
| page_title = re.search(r'<title>([^<]+)</title>', html) | |
| if page_title: | |
| raw_title = page_title.group(1).strip() | |
| first_part = re.split(r'\s*[-|]\s*', raw_title, maxsplit=1)[0].strip() | |
| if first_part and len(first_part) <= 60 and first_part.lower() not in ("error", "facebook", ""): | |
| author = first_part | |
| if author == "unknown": | |
| url_match = re.search(r'facebook\.com/([a-zA-Z0-9._]+)/posts/', html) | |
| if url_match: | |
| author = url_match.group(1).replace(".", " ").title() | |
| if author == "unknown": | |
| for pattern in [ | |
| r'"actor"\s*:\s*\{[^}]*"name"\s*:\s*"([^"]{2,60})"', | |
| r'"owner"\s*:\s*\{[^}]*"name"\s*:\s*"([^"]{2,60})"', | |
| ]: | |
| m = re.search(pattern, html, re.DOTALL) | |
| if m: | |
| candidate = m.group(1) | |
| try: | |
| candidate = json.loads(f'"{candidate}"') | |
| except Exception: | |
| pass | |
| if not re.match(r'^[a-z][a-zA-Z]+(?:Bundle|Worker|Module|Script)$', candidate): | |
| author = candidate | |
| break | |
| # 找發文時間(用 text 內容定位,往後找 100,000 字內的時間戳) | |
| time_patterns = [ | |
| r'"publish_time"\s*:\s*(\d{10})', | |
| r'\\"publish_time\\":\s*(\d{10})', | |
| r'"creation_time"\s*:\s*(\d{10})', | |
| r'\\"creation_time\\":\s*(\d{10})', | |
| ] | |
| def find_time_near(anchor_text: str, search_range: int = 5000) -> str | None: | |
| if not anchor_text: | |
| return None | |
| needle = anchor_text[:20] | |
| positions = [] | |
| start = 0 | |
| while True: | |
| pos = html.find(needle, start) | |
| if pos == -1: | |
| break | |
| positions.append(pos) | |
| start = pos + 1 | |
| try: | |
| needle_escaped = json.dumps(needle)[1:-1] | |
| start = 0 | |
| while True: | |
| pos = html.find(needle_escaped, start) | |
| if pos == -1: | |
| break | |
| positions.append(pos) | |
| start = pos + 1 | |
| except Exception: | |
| pass | |
| if not positions: | |
| return None | |
| script_positions = [p for p in positions if html[max(0,p-200):p].rfind('<script') > html[max(0,p-200):p].rfind('</script')] | |
| search_positions = script_positions if script_positions else positions | |
| for pos in sorted(search_positions): | |
| chunk_before = html[max(0, pos - search_range): pos] | |
| for pattern in time_patterns: | |
| matches = list(re.finditer(pattern, chunk_before)) | |
| if matches: | |
| ts = int(matches[-1].group(1)) | |
| if 1262304000 < ts < 1893456000: | |
| return datetime.fromtimestamp(ts).strftime("%Y-%m-%d") | |
| chunk_after = html[pos: pos + 100000] | |
| for pattern in time_patterns: | |
| m = re.search(pattern, chunk_after) | |
| if m: | |
| ts = int(m.group(1)) | |
| if 1262304000 < ts < 1893456000: | |
| return datetime.fromtimestamp(ts).strftime("%Y-%m-%d") | |
| return None | |
| if text: | |
| result = find_time_near(text) | |
| if result: | |
| published = result | |
| if published == datetime.now().strftime("%Y-%m-%d"): | |
| for pattern in time_patterns: | |
| m = re.search(pattern, html) | |
| if m: | |
| ts = int(m.group(1)) | |
| if 1262304000 < ts < 1893456000: | |
| published = datetime.fromtimestamp(ts).strftime("%Y-%m-%d") | |
| break | |
| # 找圖片(CometSinglePost preload link,過濾 profile picture) | |
| seen_urls = set() | |
| for m in re.finditer( | |
| r'<link\b(?=[^>]*data-preloader="[^"]*CometSinglePost[^"]*")[^>]*\bhref="([^"]+)"[^>]*/?>', | |
| html, | |
| ): | |
| raw_url = m.group(1).replace("&", "&") | |
| if "fbcdn.net" in raw_url and "t39.30808-1" not in raw_url and raw_url not in seen_urls: | |
| seen_urls.add(raw_url) | |
| images.append(raw_url) | |
| if not images: | |
| for m in re.finditer( | |
| r'"uri"\s*:\s*"(https:(?:\\/|/)[^"]*\.fbcdn\.net[^"]+)"', | |
| html, | |
| ): | |
| raw_url = m.group(1).replace("\\/", "/") | |
| if any(x in raw_url for x in ("emoji", "icon", "rsrc.php", "safe_image", "static", "t1.6435", "t39.30808-1")): | |
| continue | |
| if raw_url not in seen_urls: | |
| seen_urls.add(raw_url) | |
| images.append(raw_url) | |
| if not text or len(text) < 10: | |
| print("⚠️ 未能解析到貼文內容(Facebook 可能需要登入或已更改頁面結構)") | |
| debug_path = pathlib.Path(__file__).resolve().parent / "debug-fb-last.html" | |
| debug_path.write_text(html, encoding="utf-8") | |
| print(f" 已儲存 HTML 到 {debug_path} 供分析") | |
| return None | |
| slug_match = re.search(r'facebook\.com/([^/?]+)/posts/', url) | |
| author_url = f"https://www.facebook.com/{slug_match.group(1)}" if slug_match else "" | |
| return { | |
| "post_url": url, | |
| "original_request_url": url, | |
| "text": text, | |
| "username": author, | |
| "author_url": author_url, | |
| "time": published, | |
| "images": images[:10], | |
| } | |
| def manual_input_fallback(url: str) -> dict: | |
| """讓使用者手動貼上內容""" | |
| print() | |
| print("=" * 60) | |
| print("📋 手動輸入模式") | |
| print(" 請在 Facebook 開啟貼文,複製貼文內容後貼到這裡") | |
| print(" 輸入完畢後按 Enter 再輸入 END 結束") | |
| print("=" * 60) | |
| author = input("作者名稱: ").strip() or "unknown" | |
| date_str = input("發文日期(YYYY-MM-DD,留空用今天): ").strip() | |
| if not date_str: | |
| date_str = datetime.now().strftime("%Y-%m-%d") | |
| print("貼文內容(輸入 END 結束):") | |
| lines = [] | |
| while True: | |
| line = input() | |
| if line.strip() == "END": | |
| break | |
| lines.append(line) | |
| text = "\n".join(lines) | |
| return { | |
| "post_url": url, | |
| "original_request_url": url, | |
| "text": text, | |
| "username": author, | |
| "time": date_str, | |
| "images": [], | |
| } | |
| # ── main ───────────────────────────────────────────────── | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="將 Facebook public post 存成 Obsidian Clippings 筆記" | |
| ) | |
| parser.add_argument("url", help="Facebook 貼文 URL") | |
| parser.add_argument( | |
| "--cookies", | |
| default=None, | |
| help="cookies.txt 路徑(Netscape 格式,預設找 scripts/www.facebook.com_cookies.txt)", | |
| ) | |
| parser.add_argument( | |
| "--manual", | |
| action="store_true", | |
| help="跳過自動抓取,直接手動輸入內容", | |
| ) | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="只印出結果,不寫入檔案", | |
| ) | |
| args = parser.parse_args() | |
| if args.cookies: | |
| cookies_path = pathlib.Path(args.cookies).expanduser() | |
| elif DEFAULT_COOKIES.exists(): | |
| cookies_path = DEFAULT_COOKIES | |
| else: | |
| cookies_path = None | |
| print(f"🔗 URL: {args.url}") | |
| if args.manual: | |
| post = manual_input_fallback(args.url) | |
| else: | |
| post = scrape_with_requests(args.url, cookies_path) | |
| if not post or not (post.get("text") or post.get("post_text")): | |
| print() | |
| print("⚠️ 自動抓取未取得內容,切換到手動輸入模式...") | |
| post = manual_input_fallback(args.url) | |
| post["post_url"] = args.url | |
| post["original_request_url"] = args.url | |
| filename, markdown = build_markdown(post) | |
| output_path = CLIPPINGS_DIR / filename | |
| print() | |
| print("─" * 60) | |
| print(markdown) | |
| print("─" * 60) | |
| if args.dry_run: | |
| print(f"[DRY RUN] 不寫入檔案(預計路徑: {output_path})") | |
| return | |
| if output_path.exists(): | |
| stem = output_path.stem | |
| suffix = output_path.suffix | |
| counter = 1 | |
| while output_path.exists(): | |
| output_path = CLIPPINGS_DIR / f"{stem}-{counter}{suffix}" | |
| counter += 1 | |
| output_path.write_text(markdown, encoding="utf-8") | |
| print(f"✅ 已儲存: {output_path.relative_to(VAULT)}") | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| 下載 Obsidian 筆記中的外部圖片 → 上傳到 R2 → 改寫 URL | |
| 用法: | |
| python3 scripts/migrate-external-images.py --dry-run # 只列出不執行 | |
| python3 scripts/migrate-external-images.py --folder Notion/ # 只處理特定資料夾 | |
| python3 scripts/migrate-external-images.py --file path/to.md # 指定單一檔案 | |
| python3 scripts/migrate-external-images.py # 全量執行 | |
| 設定(修改下方常數): | |
| OWN_DOMAIN 你的圖床 domain(已在此 domain 的圖片會跳過) | |
| API PicList server URL(含 picbed、configName、key 參數) | |
| 架構: | |
| Obsidian → PicList Docker (NAS/server) | |
| → Cloudflare R2 (or any S3-compatible storage) | |
| → https://your-cdn-domain/ | |
| 處理邏輯: | |
| - 掃描  格式(支援有/無副檔名) | |
| - 跳過已指向自有圖床(OWN_DOMAIN)的圖片 | |
| - 透過 PicList API 讓 server 下載外部圖片並上傳到 R2 | |
| - 替換成功後改寫 md 檔案,全部完成才記錄到 done.log | |
| 日誌: | |
| scripts/ext-migrate-output.log 執行紀錄 | |
| scripts/ext-migrate-done.log 已完成的筆記(跳過重複處理) | |
| scripts/ext-migrate-failed.log 失敗的圖片 URL | |
| """ | |
| import re, json, sys, time, argparse, urllib.request, pathlib, os | |
| from urllib.parse import urlsplit, urlunsplit, quote, unquote | |
| sys.stdout.reconfigure(encoding="utf-8") | |
| sys.stderr.reconfigure(encoding="utf-8") | |
| VAULT = pathlib.Path(__file__).resolve().parent.parent | |
| # PicList API endpoint,格式: | |
| # http://<host>:<port>/upload?picbed=<uploader>&configName=<name>&key=<secret> | |
| API = "http://YOUR_PICLIST_HOST:36677/upload?picbed=aws-s3-plist&configName=R2&key=YOUR_SECRET_KEY" | |
| # 已在此 domain 的圖片會跳過(填你的圖床 domain) | |
| OWN_DOMAIN = "your-cdn-domain.example.com" | |
| DONE_LOG = VAULT / "scripts" / "ext-migrate-done.log" | |
| FAILED_LOG = VAULT / "scripts" / "ext-migrate-failed.log" | |
| OUTPUT_LOG = VAULT / "scripts" / "ext-migrate-output.log" | |
| IMG_EXT = r"png|jpe?g|gif|webp|svg|bmp|avif|tiff|ico" | |
| # 匹配以下格式(不匹配自己圖床的 URL): | |
| # 1.  | |
| # 2.  | |
| # 3.  | |
| # 4.  無副檔名(Yahoo/CDN proxy 等) | |
| EXTERNAL_IMG_RE = re.compile( | |
| r'(!\[[^\]]*\])\((https?://(?!(?:[^/]*\.)?' + re.escape(OWN_DOMAIN) + r')[^)\s"]+)' | |
| r'(?:' | |
| r'\.(?:' + IMG_EXT + r')(?:\?[^)\s"]*)?' # 有副檔名 | |
| r'|' | |
| r'(?:/[^)\s"]*)?' # 無副檔名(path 結尾) | |
| r')\s*(?:"[^"]*")?\)', | |
| re.IGNORECASE, | |
| ) | |
| SKIP_DIRS = {".trash", ".obsidian", "Templates", "scripts", ".git"} | |
| _log_fh = None | |
| def log(msg=""): | |
| print(msg, flush=True) | |
| if _log_fh: | |
| _log_fh.write(msg + "\n") | |
| _log_fh.flush() | |
| def encode_url(url: str) -> str: | |
| """URL encode 非 ASCII 字元(中文檔名等)""" | |
| parts = urlsplit(url) | |
| encoded_path = quote(unquote(parts.path), safe="/:@!$&'()*+,;=-._~") | |
| return urlunsplit((parts.scheme, parts.netloc, encoded_path, parts.query, parts.fragment)) | |
| def upload_url_to_r2(url: str): | |
| """直接把外部 URL 丟給 PicList,讓 server 端下載再傳到 R2""" | |
| encoded_url = encode_url(url) | |
| payload = json.dumps({"list": [encoded_url]}).encode() | |
| req = urllib.request.Request( | |
| API, data=payload, headers={"Content-Type": "application/json"}, | |
| ) | |
| try: | |
| with urllib.request.urlopen(req, timeout=60) as resp: | |
| data = json.loads(resp.read()) | |
| if data.get("success") and data.get("result"): | |
| return data["result"][0] | |
| log(f" API 失敗: {data}") | |
| except Exception as e: | |
| log(f" API 錯誤: {e}") | |
| return None | |
| def load_lines(path: pathlib.Path) -> set: | |
| if path.exists(): | |
| return set(path.read_text(encoding="utf-8").splitlines()) | |
| return set() | |
| def main(): | |
| global _log_fh | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--dry-run", action="store_true") | |
| parser.add_argument("--folder", default="") | |
| parser.add_argument("--file", default="", help="指定單一 md 檔案(相對於 vault 的路徑)") | |
| args = parser.parse_args() | |
| _log_fh = open(OUTPUT_LOG, "w", encoding="utf-8") | |
| done_set = load_lines(DONE_LOG) | |
| if args.file: | |
| md_file = pathlib.Path(args.file) | |
| if not md_file.is_absolute(): | |
| md_file = VAULT / args.file | |
| if not md_file.exists(): | |
| log(f"找不到檔案: {md_file}") | |
| _log_fh.close() | |
| return | |
| candidates = [md_file] | |
| else: | |
| search_root = VAULT / args.folder if args.folder else VAULT | |
| candidates = sorted(search_root.rglob("*.md")) | |
| targets = [] | |
| for md_file in candidates: | |
| rel = str(md_file.relative_to(VAULT)) | |
| if any(part in SKIP_DIRS for part in pathlib.Path(rel).parts): | |
| continue | |
| if rel in done_set: | |
| continue | |
| content = md_file.read_text(encoding="utf-8") | |
| matches = EXTERNAL_IMG_RE.findall(content) | |
| if matches: | |
| urls = list(set(m[1] for m in matches)) | |
| targets.append((md_file, rel, urls)) | |
| total_imgs = sum(len(urls) for _, _, urls in targets) | |
| log("=== 外部圖片 → R2 批次遷移 ===") | |
| log(f"日誌: {OUTPUT_LOG}") | |
| log(f"共 {len(targets)} 篇筆記、{total_imgs} 個外部圖片") | |
| if args.dry_run: | |
| log("[DRY RUN] 只列出不實際執行") | |
| log() | |
| stats = {"ok": 0, "skip": 0, "fail": 0} | |
| for idx, (md_file, rel, urls) in enumerate(targets, 1): | |
| log(f"[{idx}/{len(targets)}] {rel} ({len(urls)} 張)") | |
| content = md_file.read_text(encoding="utf-8") | |
| changed = False | |
| all_ok = True | |
| for url in urls: | |
| if url not in content: | |
| log(f" ⏭ 已無此 URL,跳過") | |
| stats["skip"] += 1 | |
| continue | |
| short_url = url[:80] + ("..." if len(url) > 80 else "") | |
| if args.dry_run: | |
| log(f" → {short_url}") | |
| continue | |
| remote_url = upload_url_to_r2(url) | |
| if not remote_url: | |
| log(f" ✗ {short_url} (上傳失敗)") | |
| with open(FAILED_LOG, "a", encoding="utf-8") as fh: | |
| fh.write(f"{rel} | {url} | UPLOAD_FAILED\n") | |
| all_ok = False | |
| stats["fail"] += 1 | |
| time.sleep(2) | |
| continue | |
| new_content = content.replace(url, remote_url) | |
| if new_content != content: | |
| content = new_content | |
| changed = True | |
| stats["ok"] += 1 | |
| log(f" ✓ {short_url}") | |
| else: | |
| log(f" ⚠ 替換未生效: {short_url}") | |
| stats["fail"] += 1 | |
| time.sleep(0.5) | |
| if not args.dry_run and changed: | |
| md_file.write_text(content, encoding="utf-8") | |
| verify = md_file.read_text(encoding="utf-8") | |
| remaining = len(EXTERNAL_IMG_RE.findall(verify)) | |
| if remaining == 0: | |
| log(f" 📝 已寫入,外部圖片全部替換 ✓") | |
| with open(DONE_LOG, "a", encoding="utf-8") as fh: | |
| fh.write(rel + "\n") | |
| else: | |
| log(f" 📝 已寫入,仍有 {remaining} 個外部圖片") | |
| log() | |
| log("=== 完成 ===") | |
| log(f"成功: {stats['ok']}") | |
| log(f"跳過: {stats['skip']}") | |
| log(f"失敗: {stats['fail']}") | |
| if stats["fail"] > 0: | |
| log(f"失敗紀錄: {FAILED_LOG}") | |
| _log_fh.close() | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| 批次上傳 Obsidian 本地附件到 Cloudflare R2 並改寫 .md 引用 | |
| 用法: | |
| python3 scripts/migrate-to-r2.py --dry-run # 只列出不執行 | |
| python3 scripts/migrate-to-r2.py --folder "Notion/" # 只處理特定資料夾 | |
| python3 scripts/migrate-to-r2.py --file path/to/note.md # 指定單一檔案 | |
| python3 scripts/migrate-to-r2.py # 全量執行 | |
| 設定(修改下方常數): | |
| ATTACH_DIR 本地附件目錄 | |
| API PicList server URL(含 picbed、configName、key 參數) | |
| 日誌自動寫到 scripts/migrate-output.log | |
| """ | |
| import re, json, time, argparse, urllib.request, pathlib, hashlib | |
| VAULT = pathlib.Path(__file__).resolve().parent.parent | |
| ATTACH_DIR = VAULT / "attachments" # ← 修改為你的附件目錄 | |
| # PicList API endpoint,格式: | |
| # http://<host>:<port>/upload?picbed=<uploader>&configName=<name>&key=<secret> | |
| API = "http://YOUR_PICLIST_HOST:36677/upload?picbed=aws-s3-plist&configName=R2&key=YOUR_SECRET_KEY" | |
| DONE_LOG = VAULT / "scripts" / "migrate-done.log" | |
| FAILED_LOG = VAULT / "scripts" / "migrate-failed.log" | |
| OUTPUT_LOG = VAULT / "scripts" / "migrate-output.log" | |
| EXTENSIONS = ( | |
| "png|jpg|jpeg|gif|webp|svg|bmp|tiff|ico|avif|" | |
| "mp4|webm|ogv|mov|mkv|mp3|wav|ogg|m4a|flac|3gp|aac|pdf" | |
| ) | |
| # 匹配 ![[file.ext]] 或 ![[file.ext|alias]],捕獲檔名(不含 |alias) | |
| WIKILINK_RE = re.compile( | |
| r'!\[\[([^|\]]+\.(?:' + EXTENSIONS + r'))(?:\|[^\]]*)?\]\]', | |
| re.IGNORECASE, | |
| ) | |
| SKIP_DIRS = {".trash", ".obsidian", "Templates", "scripts", ".git"} | |
| _log_fh = None | |
| # ── helpers ────────────────────────────────────────────── | |
| def log(msg=""): | |
| print(msg, flush=True) | |
| if _log_fh: | |
| _log_fh.write(msg + "\n") | |
| _log_fh.flush() | |
| def find_local_file(filename: str): | |
| """在 attachments 目錄或整個 vault 找檔案""" | |
| candidate = ATTACH_DIR / filename | |
| if candidate.exists(): | |
| return candidate | |
| for p in VAULT.rglob(filename): | |
| if not any(part in SKIP_DIRS for part in p.parts): | |
| return p | |
| return None | |
| def file_md5(path: pathlib.Path) -> str: | |
| return hashlib.md5(path.read_bytes()).hexdigest() | |
| def upload_file(local_path: pathlib.Path): | |
| """呼叫 PicList API,回傳遠端 URL 或 None""" | |
| payload = json.dumps({"list": [str(local_path)]}).encode() | |
| req = urllib.request.Request( | |
| API, data=payload, headers={"Content-Type": "application/json"}, | |
| ) | |
| try: | |
| with urllib.request.urlopen(req, timeout=60) as resp: | |
| data = json.loads(resp.read()) | |
| if data.get("success") and data.get("result"): | |
| return data["result"][0] | |
| except Exception as e: | |
| log(f" API 錯誤: {e}") | |
| return None | |
| def load_lines(path: pathlib.Path) -> set: | |
| if path.exists(): | |
| return set(path.read_text(encoding="utf-8").splitlines()) | |
| return set() | |
| # ── main ───────────────────────────────────────────────── | |
| def main(): | |
| global _log_fh | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--dry-run", action="store_true", help="只列出不實際執行") | |
| parser.add_argument("--folder", default="", help="只處理特定子資料夾") | |
| parser.add_argument("--file", default="", help="指定單一 md 檔案(相對於 vault 的路徑)") | |
| args = parser.parse_args() | |
| _log_fh = open(OUTPUT_LOG, "w", encoding="utf-8") | |
| done_set = load_lines(DONE_LOG) | |
| if args.file: | |
| md_file = pathlib.Path(args.file) | |
| if not md_file.is_absolute(): | |
| md_file = VAULT / args.file | |
| if not md_file.exists(): | |
| log(f"找不到檔案: {md_file}") | |
| _log_fh.close() | |
| return | |
| candidates = [md_file] | |
| else: | |
| search_root = VAULT / args.folder if args.folder else VAULT | |
| candidates = sorted(search_root.rglob("*.md")) | |
| targets = [] | |
| for md_file in candidates: | |
| rel = str(md_file.relative_to(VAULT)) | |
| if any(part in SKIP_DIRS for part in pathlib.Path(rel).parts): | |
| continue | |
| if rel in done_set: | |
| continue | |
| content = md_file.read_text(encoding="utf-8") | |
| filenames = list(set(WIKILINK_RE.findall(content))) | |
| if filenames: | |
| targets.append((md_file, rel, filenames)) | |
| log("=== Obsidian → R2 批次遷移 ===") | |
| log(f"日誌: {OUTPUT_LOG}") | |
| total_imgs = sum(len(fns) for _, _, fns in targets) | |
| log(f"共 {len(targets)} 篇筆記、{total_imgs} 個附件引用需要處理") | |
| if args.dry_run: | |
| log("[DRY RUN] 只列出不實際執行") | |
| log() | |
| stats = {"ok": 0, "skip": 0, "fail": 0} | |
| for idx, (md_file, rel, filenames) in enumerate(targets, 1): | |
| log(f"[{idx}/{len(targets)}] {rel} ({len(filenames)} 個附件)") | |
| content = md_file.read_text(encoding="utf-8") | |
| changed = False | |
| all_ok = True | |
| for filename in filenames: | |
| pattern = re.compile( | |
| r'!\[\[' + re.escape(filename) + r'(?:\|[^\]]*)?\]\]' | |
| ) | |
| if not pattern.search(content): | |
| log(f" ⏭ {filename} (已無 wikilink,跳過)") | |
| stats["skip"] += 1 | |
| continue | |
| local_path = find_local_file(filename) | |
| if local_path is None: | |
| log(f" ⚠ 找不到: {filename}") | |
| with open(FAILED_LOG, "a") as fh: | |
| fh.write(f"{rel} | {filename} | FILE_NOT_FOUND\n") | |
| all_ok = False | |
| stats["fail"] += 1 | |
| continue | |
| if args.dry_run: | |
| log(f" → {filename} ({local_path})") | |
| continue | |
| remote_url = upload_file(local_path) | |
| if not remote_url: | |
| log(f" ✗ {filename} (上傳失敗)") | |
| with open(FAILED_LOG, "a") as fh: | |
| fh.write(f"{rel} | {filename} | UPLOAD_FAILED\n") | |
| all_ok = False | |
| stats["fail"] += 1 | |
| time.sleep(2) | |
| continue | |
| alt = pathlib.Path(filename).stem | |
| new_content = pattern.sub(f"", content) | |
| if new_content != content: | |
| content = new_content | |
| changed = True | |
| stats["ok"] += 1 | |
| log(f" ✓ {filename}") | |
| else: | |
| log(f" ⚠ {filename} 替換未生效(regex 未匹配)") | |
| stats["fail"] += 1 | |
| time.sleep(0.3) | |
| if not args.dry_run and changed: | |
| md_file.write_text(content, encoding="utf-8") | |
| verify = md_file.read_text(encoding="utf-8") | |
| remaining = len(WIKILINK_RE.findall(verify)) | |
| if remaining == 0: | |
| log(f" 📝 已寫入,wikilink 全部清除 ✓") | |
| with open(DONE_LOG, "a") as fh: | |
| fh.write(rel + "\n") | |
| else: | |
| log(f" 📝 已寫入,仍有 {remaining} 個 wikilink 未處理") | |
| log() | |
| log("=== 完成 ===") | |
| log(f"上傳成功: {stats['ok']}") | |
| log(f"已跳過: {stats['skip']}") | |
| log(f"失敗: {stats['fail']}") | |
| if stats["fail"] > 0: | |
| log(f"失敗紀錄: {FAILED_LOG}") | |
| _log_fh.close() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment